3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
35 _MD5_FUNC
= hashlib
.md5
51 from google
.net
.proto
import ProtocolBuffer
52 from google
.appengine
.datastore
import entity_pb
54 from google
.appengine
.api
import api_base_pb
55 from google
.appengine
.api
import apiproxy_stub_map
56 from google
.appengine
.api
import datastore_admin
57 from google
.appengine
.api
import datastore_errors
58 from google
.appengine
.api
import datastore_types
59 from google
.appengine
.api
.taskqueue
import taskqueue_service_pb
60 from google
.appengine
.datastore
import datastore_index
61 from google
.appengine
.datastore
import datastore_pb
62 from google
.appengine
.datastore
import datastore_pbs
63 from google
.appengine
.datastore
import datastore_query
64 from google
.appengine
.datastore
import datastore_stub_index
65 from google
.appengine
.datastore
import datastore_v4_pb
66 from google
.appengine
.runtime
import apiproxy_errors
71 _MAXIMUM_RESULTS
= 300
77 _MAXIMUM_QUERY_RESULT_BYTES
= 2000000
83 _MAX_QUERY_OFFSET
= 1000
87 _PROPERTY_TYPE_NAMES
= {
89 entity_pb
.PropertyValue
.kint64Value
: 'INT64',
90 entity_pb
.PropertyValue
.kbooleanValue
: 'BOOLEAN',
91 entity_pb
.PropertyValue
.kstringValue
: 'STRING',
92 entity_pb
.PropertyValue
.kdoubleValue
: 'DOUBLE',
93 entity_pb
.PropertyValue
.kPointValueGroup
: 'POINT',
94 entity_pb
.PropertyValue
.kUserValueGroup
: 'USER',
95 entity_pb
.PropertyValue
.kReferenceValueGroup
: 'REFERENCE'
100 _SCATTER_PROPORTION
= 32768
110 _BLOB_MEANINGS
= frozenset((entity_pb
.Property
.BLOB
,
111 entity_pb
.Property
.ENTITY_PROTO
,
112 entity_pb
.Property
.TEXT
))
124 _INITIAL_RETRY_DELAY_MS
= 100
128 _RETRY_DELAY_MULTIPLIER
= 2
132 _MAX_RETRY_DELAY_MS
= 120000
137 SEQUENTIAL
= 'sequential'
138 SCATTERED
= 'scattered'
144 _MAX_SEQUENTIAL_BIT
= 52
149 _MAX_SEQUENTIAL_COUNTER
= (1 << _MAX_SEQUENTIAL_BIT
) - 1
153 _MAX_SEQUENTIAL_ID
= _MAX_SEQUENTIAL_COUNTER
158 _MAX_SCATTERED_COUNTER
= (1 << (_MAX_SEQUENTIAL_BIT
- 1)) - 1
164 _MAX_SCATTERED_ID
= _MAX_SEQUENTIAL_ID
+ 1 + _MAX_SCATTERED_COUNTER
168 _SCATTER_SHIFT
= 64 - _MAX_SEQUENTIAL_BIT
+ 1
171 def _GetScatterProperty(entity_proto
):
172 """Gets the scatter property for an object.
174 For ease of implementation, this is not synchronized with the actual
175 value on the App Engine server, but should work equally well.
177 Note: This property may change, either here or in production. No client
178 other than the mapper framework should rely on it directly.
181 The PropertyValue of the scatter property or None if this entity should not
182 have a scatter property.
184 hash_obj
= _MD5_FUNC()
185 for element
in entity_proto
.key().path().element_list():
186 if element
.has_name():
187 hash_obj
.update(element
.name())
188 elif element
.has_id():
189 hash_obj
.update(str(element
.id()))
190 hash_bytes
= hash_obj
.digest()[0:2]
191 (hash_int
,) = struct
.unpack('H', hash_bytes
)
193 if hash_int
>= _SCATTER_PROPORTION
:
196 scatter_property
= entity_pb
.Property()
197 scatter_property
.set_name(datastore_types
.SCATTER_SPECIAL_PROPERTY
)
198 scatter_property
.set_meaning(entity_pb
.Property
.BYTESTRING
)
199 scatter_property
.set_multiple(False)
200 property_value
= scatter_property
.mutable_value()
201 property_value
.set_stringvalue(hash_bytes
)
202 return scatter_property
208 _SPECIAL_PROPERTY_MAP
= {
209 datastore_types
.SCATTER_SPECIAL_PROPERTY
: (False, True, _GetScatterProperty
)
213 def GetInvisibleSpecialPropertyNames():
214 """Gets the names of all non user-visible special properties."""
216 for name
, value
in _SPECIAL_PROPERTY_MAP
.items():
217 is_visible
, _
, _
= value
219 invisible_names
.append(name
)
220 return invisible_names
223 def _PrepareSpecialProperties(entity_proto
, is_load
):
224 """Computes special properties for loading or storing.
225 Strips other special properties."""
226 for i
in xrange(entity_proto
.property_size() - 1, -1, -1):
227 if _SPECIAL_PROPERTY_MAP
.has_key(entity_proto
.property(i
).name()):
228 del entity_proto
.property_list()[i
]
230 for is_visible
, is_stored
, property_func
in _SPECIAL_PROPERTY_MAP
.values():
232 should_process
= is_visible
234 should_process
= is_stored
237 special_property
= property_func(entity_proto
)
239 entity_proto
.property_list().append(special_property
)
242 def _GetGroupByKey(entity
, property_names
):
243 """Computes a key value that uniquely identifies the 'group' of an entity.
246 entity: The entity_pb.EntityProto for which to create the group key.
247 property_names: The names of the properties in the group by clause.
250 A hashable value that uniquely identifies the entity's 'group'.
252 return frozenset((prop
.name(), prop
.value().SerializeToString())
253 for prop
in entity
.property_list()
254 if prop
.name() in property_names
)
257 def PrepareSpecialPropertiesForStore(entity_proto
):
258 """Computes special properties for storing.
259 Strips other special properties."""
260 _PrepareSpecialProperties(entity_proto
, False)
263 def LoadEntity(entity
, keys_only
=False, property_names
=None):
264 """Prepares an entity to be returned to the user.
267 entity: a entity_pb.EntityProto or None
268 keys_only: if a keys only result should be produced
269 property_names: if not None or empty, cause a projected entity
270 to be produced with the given properties.
273 A user friendly copy of entity or None.
276 clone
= entity_pb
.EntityProto()
279 clone
.mutable_key().CopyFrom(entity
.key())
280 clone
.mutable_entity_group()
282 for prop
in entity
.property_list():
283 if prop
.name() in property_names
:
285 Check(prop
.name() not in seen
,
286 "datastore dev stub produced bad result",
287 datastore_pb
.Error
.INTERNAL_ERROR
)
288 seen
.add(prop
.name())
289 new_prop
= clone
.add_property()
290 new_prop
.set_name(prop
.name())
291 new_prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
292 new_prop
.mutable_value().CopyFrom(prop
.value())
293 new_prop
.set_multiple(False)
296 clone
.mutable_key().CopyFrom(entity
.key())
297 clone
.mutable_entity_group()
300 clone
.CopyFrom(entity
)
301 PrepareSpecialPropertiesForLoad(clone
)
305 def StoreEntity(entity
):
306 """Prepares an entity for storing.
309 entity: a entity_pb.EntityProto to prepare
312 A copy of entity that should be stored in its place.
314 clone
= entity_pb
.EntityProto()
315 clone
.CopyFrom(entity
)
319 PrepareSpecialPropertiesForStore(clone
)
323 def PrepareSpecialPropertiesForLoad(entity_proto
):
324 """Computes special properties that are user-visible.
325 Strips other special properties."""
326 _PrepareSpecialProperties(entity_proto
, True)
329 def Check(test
, msg
='', error_code
=datastore_pb
.Error
.BAD_REQUEST
):
330 """Raises an apiproxy_errors.ApplicationError if the condition is false.
333 test: A condition to test.
334 msg: A string to return with the error.
335 error_code: One of datastore_pb.Error to use as an error code.
338 apiproxy_errors.ApplicationError: If test is false.
341 raise apiproxy_errors
.ApplicationError(error_code
, msg
)
344 def CheckValidUTF8(string
, desc
):
345 """Check that the given string is valid UTF-8.
348 string: the string to validate.
349 desc: a description of the string being validated.
352 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
355 string
.decode('utf-8')
356 except UnicodeDecodeError:
357 Check(False, '%s is not valid UTF-8.' % desc
)
360 def CheckAppId(request_trusted
, request_app_id
, app_id
):
361 """Check that this is the stub for app_id.
364 request_trusted: If the request is trusted.
365 request_app_id: The application ID of the app making the request.
366 app_id: An application ID.
369 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
373 CheckValidUTF8(app_id
, "app id");
374 Check(request_trusted
or app_id
== request_app_id
,
375 'app "%s" cannot access app "%s"\'s data' % (request_app_id
, app_id
))
378 def CheckReference(request_trusted
,
381 require_id_or_name
=True):
385 request_trusted: If the request is trusted.
386 request_app_id: The application ID of the app making the request.
387 key: entity_pb.Reference
388 require_id_or_name: Boolean indicating if we should enforce the presence of
389 an id or name in the last element of the key's path.
392 apiproxy_errors.ApplicationError: if the key is invalid
395 assert isinstance(key
, entity_pb
.Reference
)
397 CheckAppId(request_trusted
, request_app_id
, key
.app())
399 Check(key
.path().element_size() > 0, 'key\'s path cannot be empty')
401 if require_id_or_name
:
403 last_element
= key
.path().element_list()[-1]
404 has_id_or_name
= ((last_element
.has_id() and last_element
.id() != 0) or
405 (last_element
.has_name() and last_element
.name() != ""))
406 if not has_id_or_name
:
407 raise datastore_errors
.BadRequestError('missing key id/name')
409 for elem
in key
.path().element_list():
410 Check(not elem
.has_id() or not elem
.has_name(),
411 'each key path element should have id or name but not both: %r' % key
)
412 CheckValidUTF8(elem
.type(), 'key path element type')
414 CheckValidUTF8(elem
.name(), 'key path element name')
417 def CheckEntity(request_trusted
, request_app_id
, entity
):
418 """Check if this entity can be stored.
421 request_trusted: If the request is trusted.
422 request_app_id: The application ID of the app making the request.
423 entity: entity_pb.EntityProto
426 apiproxy_errors.ApplicationError: if the entity is invalid
430 CheckReference(request_trusted
, request_app_id
, entity
.key(), False)
431 for prop
in entity
.property_list():
432 CheckProperty(request_trusted
, request_app_id
, prop
)
433 for prop
in entity
.raw_property_list():
434 CheckProperty(request_trusted
, request_app_id
, prop
, indexed
=False)
437 def CheckProperty(request_trusted
, request_app_id
, prop
, indexed
=True):
438 """Check if this property can be stored.
441 request_trusted: If the request is trusted.
442 request_app_id: The application ID of the app making the request.
443 prop: entity_pb.Property
444 indexed: Whether the property is indexed.
447 apiproxy_errors.ApplicationError: if the property is invalid
451 meaning
= prop
.meaning()
452 CheckValidUTF8(name
, 'property name')
453 Check(request_trusted
or
454 not datastore_types
.RESERVED_PROPERTY_NAME
.match(name
),
455 'cannot store entity with reserved property name \'%s\'' % name
)
456 Check(prop
.meaning() != entity_pb
.Property
.INDEX_VALUE
,
457 'Entities with incomplete properties cannot be written.')
458 is_blob
= meaning
in _BLOB_MEANINGS
461 'BLOB, ENITY_PROTO or TEXT property ' + name
+
462 ' must be in a raw_property field')
463 max_length
= datastore_types
._MAX
_STRING
_LENGTH
466 Check(value
.has_stringvalue(),
467 'BLOB / ENTITY_PROTO / TEXT raw property ' + name
+
468 'must have a string value')
469 max_length
= datastore_types
._MAX
_RAW
_PROPERTY
_BYTES
470 if meaning
== entity_pb
.Property
.ATOM_LINK
:
471 max_length
= datastore_types
._MAX
_LINK
_PROPERTY
_LENGTH
473 CheckPropertyValue(name
, value
, max_length
, meaning
)
476 def CheckPropertyValue(name
, value
, max_length
, meaning
):
477 """Check if this property value can be stored.
480 name: name of the property
481 value: entity_pb.PropertyValue
482 max_length: maximum length for string values
483 meaning: meaning of the property
486 apiproxy_errors.ApplicationError: if the property is invalid
488 num_values
= (value
.has_int64value() +
489 value
.has_stringvalue() +
490 value
.has_booleanvalue() +
491 value
.has_doublevalue() +
492 value
.has_pointvalue() +
493 value
.has_uservalue() +
494 value
.has_referencevalue())
495 Check(num_values
<= 1, 'PropertyValue for ' + name
+
496 ' has multiple value fields set')
498 if value
.has_stringvalue():
506 s16
= value
.stringvalue().decode('utf-8', 'replace').encode('utf-16')
508 Check((len(s16
) - 2) / 2 <= max_length
,
509 'Property %s is too long. Maximum length is %d.' % (name
, max_length
))
510 if (meaning
not in _BLOB_MEANINGS
and
511 meaning
!= entity_pb
.Property
.BYTESTRING
):
512 CheckValidUTF8(value
.stringvalue(), 'String property value')
515 def CheckTransaction(request_trusted
, request_app_id
, transaction
):
516 """Check that this transaction is valid.
519 request_trusted: If the request is trusted.
520 request_app_id: The application ID of the app making the request.
521 transaction: datastore_pb.Transaction
524 apiproxy_errors.ApplicationError: if the transaction is not valid.
526 assert isinstance(transaction
, datastore_pb
.Transaction
)
527 CheckAppId(request_trusted
, request_app_id
, transaction
.app())
530 def CheckQuery(query
, filters
, orders
, max_query_components
):
531 """Check a datastore query with normalized filters, orders.
533 Raises an ApplicationError when any of the following conditions are violated:
534 - transactional queries have an ancestor
535 - queries that are not too large
536 (sum of filters, orders, ancestor <= max_query_components)
537 - ancestor (if any) app and namespace match query app and namespace
538 - kindless queries only filter on __key__ and only sort on __key__ ascending
539 - multiple inequality (<, <=, >, >=) filters all applied to the same property
540 - filters on __key__ compare to a reference in the same app and namespace as
542 - if an inequality filter on prop X is used, the first order (if any) must
546 query: query to validate
547 filters: normalized (by datastore_index.Normalize) filters from query
548 orders: normalized (by datastore_index.Normalize) orders from query
549 max_query_components: limit on query complexity
551 Check(query
.property_name_size() == 0 or not query
.keys_only(),
552 'projection and keys_only cannot both be set')
554 projected_properties
= set(query
.property_name_list())
555 for prop_name
in query
.property_name_list():
556 Check(not datastore_types
.RESERVED_PROPERTY_NAME
.match(prop_name
),
557 'projections are not supported for the property: ' + prop_name
)
558 Check(len(projected_properties
) == len(query
.property_name_list()),
559 "cannot project a property multiple times")
561 key_prop_name
= datastore_types
.KEY_SPECIAL_PROPERTY
562 unapplied_log_timestamp_us_name
= (
563 datastore_types
._UNAPPLIED
_LOG
_TIMESTAMP
_SPECIAL
_PROPERTY
)
565 if query
.has_transaction():
567 Check(query
.has_ancestor(),
568 'Only ancestor queries are allowed inside transactions.')
571 num_components
= len(filters
) + len(orders
)
572 if query
.has_ancestor():
574 Check(num_components
<= max_query_components
,
575 'query is too large. may not have more than %s filters'
576 ' + sort orders ancestor total' % max_query_components
)
579 if query
.has_ancestor():
580 ancestor
= query
.ancestor()
581 Check(query
.app() == ancestor
.app(),
582 'query app is %s but ancestor app is %s' %
583 (query
.app(), ancestor
.app()))
584 Check(query
.name_space() == ancestor
.name_space(),
585 'query namespace is %s but ancestor namespace is %s' %
586 (query
.name_space(), ancestor
.name_space()))
589 if query
.group_by_property_name_size():
590 group_by_set
= set(query
.group_by_property_name_list())
594 Check(order
.property() in group_by_set
,
595 'items in the group by clause must be specified first '
597 group_by_set
.remove(order
.property())
601 ineq_prop_name
= None
602 for filter in filters
:
603 Check(filter.property_size() == 1,
604 'Filter has %d properties, expected 1' % filter.property_size())
606 prop
= filter.property(0)
607 prop_name
= prop
.name().decode('utf-8')
609 if prop_name
== key_prop_name
:
613 Check(prop
.value().has_referencevalue(),
614 '%s filter value must be a Key' % key_prop_name
)
615 ref_val
= prop
.value().referencevalue()
616 Check(ref_val
.app() == query
.app(),
617 '%s filter app is %s but query app is %s' %
618 (key_prop_name
, ref_val
.app(), query
.app()))
619 Check(ref_val
.name_space() == query
.name_space(),
620 '%s filter namespace is %s but query namespace is %s' %
621 (key_prop_name
, ref_val
.name_space(), query
.name_space()))
623 if filter.op() in datastore_index
.EQUALITY_OPERATORS
:
624 Check(prop_name
not in projected_properties
,
625 'cannot use projection on a property with an equality filter')
626 if (filter.op() in datastore_index
.INEQUALITY_OPERATORS
and
627 prop_name
!= unapplied_log_timestamp_us_name
):
628 if ineq_prop_name
is None:
629 ineq_prop_name
= prop_name
631 Check(ineq_prop_name
== prop_name
,
632 'Only one inequality filter per query is supported. '
633 'Encountered both %s and %s' % (ineq_prop_name
, prop_name
))
635 if (ineq_prop_name
is not None
636 and query
.group_by_property_name_size() > 0
639 Check(ineq_prop_name
in group_by_set
,
640 'Inequality filter on %s must also be a group by '
641 'property when group by properties are set.'
644 if ineq_prop_name
is not None and orders
:
646 first_order_prop
= orders
[0].property().decode('utf-8')
647 Check(first_order_prop
== ineq_prop_name
,
648 'The first sort property must be the same as the property '
649 'to which the inequality filter is applied. In your query '
650 'the first sort property is %s but the inequality filter '
651 'is on %s' % (first_order_prop
, ineq_prop_name
))
653 if not query
.has_kind():
655 for filter in filters
:
656 prop_name
= filter.property(0).name().decode('utf-8')
657 Check(prop_name
== key_prop_name
or
658 prop_name
== unapplied_log_timestamp_us_name
,
659 'kind is required for non-__key__ filters')
661 prop_name
= order
.property().decode('utf-8')
662 Check(prop_name
== key_prop_name
and
663 order
.direction() is datastore_pb
.Query_Order
.ASCENDING
,
664 'kind is required for all orders except __key__ ascending')
667 class ValueRange(object):
668 """A range of values defined by its two extremes (inclusive or exclusive)."""
673 Creates an unlimited range.
675 self
.__start
= self
.__end
= None
676 self
.__start
_inclusive
= self
.__end
_inclusive
= False
678 def Update(self
, rel_op
, limit
):
679 """Filter the range by 'rel_op limit'.
682 rel_op: relational operator from datastore_pb.Query_Filter.
683 limit: the value to limit the range by.
686 if rel_op
== datastore_pb
.Query_Filter
.LESS_THAN
:
687 if self
.__end
is None or limit
<= self
.__end
:
689 self
.__end
_inclusive
= False
690 elif (rel_op
== datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
or
691 rel_op
== datastore_pb
.Query_Filter
.EQUAL
):
692 if self
.__end
is None or limit
< self
.__end
:
694 self
.__end
_inclusive
= True
696 if rel_op
== datastore_pb
.Query_Filter
.GREATER_THAN
:
697 if self
.__start
is None or limit
>= self
.__start
:
699 self
.__start
_inclusive
= False
700 elif (rel_op
== datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
or
701 rel_op
== datastore_pb
.Query_Filter
.EQUAL
):
702 if self
.__start
is None or limit
> self
.__start
:
704 self
.__start
_inclusive
= True
706 def Contains(self
, value
):
707 """Check if the range contains a specific value.
710 value: the value to check.
712 True iff value is contained in this range.
714 if self
.__start
is not None:
715 if self
.__start
_inclusive
and value
< self
.__start
: return False
716 if not self
.__start
_inclusive
and value
<= self
.__start
: return False
717 if self
.__end
is not None:
718 if self
.__end
_inclusive
and value
> self
.__end
: return False
719 if not self
.__end
_inclusive
and value
>= self
.__end
: return False
722 def Remap(self
, mapper
):
723 """Transforms the range extremes with a function.
725 The function mapper must preserve order, i.e.
726 x rel_op y iff mapper(x) rel_op y
729 mapper: function to apply to the range extremes.
731 self
.__start
= self
.__start
and mapper(self
.__start
)
732 self
.__end
= self
.__end
and mapper(self
.__end
)
734 def MapExtremes(self
, mapper
):
735 """Evaluate a function on the range extremes.
738 mapper: function to apply to the range extremes.
740 (x, y) where x = None if the range has no start,
741 mapper(start, start_inclusive, False) otherwise
742 y = None if the range has no end,
743 mapper(end, end_inclusive, True) otherwise
746 self
.__start
and mapper(self
.__start
, self
.__start
_inclusive
, False),
747 self
.__end
and mapper(self
.__end
, self
.__end
_inclusive
, True))
750 def ParseKeyFilteredQuery(filters
, orders
):
751 """Parse queries which only allow filters and ascending-orders on __key__.
753 Raises exceptions for illegal queries.
755 filters: the normalized filters of a query.
756 orders: the normalized orders of a query.
758 The key range (a ValueRange over datastore_types.Key) requested in the
762 remaining_filters
= []
763 key_range
= ValueRange()
764 key_prop
= datastore_types
.KEY_SPECIAL_PROPERTY
767 if not (f
.property_size() == 1 and
768 f
.property(0).name() == key_prop
and
769 not (op
== datastore_pb
.Query_Filter
.IN
or
770 op
== datastore_pb
.Query_Filter
.EXISTS
)):
771 remaining_filters
.append(f
)
774 val
= f
.property(0).value()
775 Check(val
.has_referencevalue(), '__key__ kind must be compared to a key')
776 limit
= datastore_types
.FromReferenceProperty(val
)
777 key_range
.Update(op
, limit
)
780 remaining_orders
= []
782 if not (o
.direction() == datastore_pb
.Query_Order
.ASCENDING
and
783 o
.property() == datastore_types
.KEY_SPECIAL_PROPERTY
):
784 remaining_orders
.append(o
)
790 Check(not remaining_filters
,
791 'Only comparison filters on ' + key_prop
+ ' supported')
792 Check(not remaining_orders
,
793 'Only ascending order on ' + key_prop
+ ' supported')
798 def ParseKindQuery(query
, filters
, orders
):
799 """Parse __kind__ (schema) queries.
801 Raises exceptions for illegal queries.
804 filters: the normalized filters from query.
805 orders: the normalized orders from query.
807 The kind range (a ValueRange over string) requested in the query.
810 Check(not query
.has_ancestor(), 'ancestor queries on __kind__ not allowed')
812 key_range
= ParseKeyFilteredQuery(filters
, orders
)
813 key_range
.Remap(_KindKeyToString
)
818 def _KindKeyToString(key
):
819 """Extract kind name from __kind__ key.
821 Raises an ApplicationError if the key is not of the form '__kind__'/name.
824 key: a key for a __kind__ instance.
826 kind specified by key.
828 key_path
= key
.to_path()
829 if (len(key_path
) == 2 and key_path
[0] == '__kind__' and
830 isinstance(key_path
[1], basestring
)):
832 Check(False, 'invalid Key for __kind__ table')
835 def ParseNamespaceQuery(query
, filters
, orders
):
836 """Parse __namespace__ queries.
838 Raises exceptions for illegal queries.
841 filters: the normalized filters from query.
842 orders: the normalized orders from query.
844 The kind range (a ValueRange over string) requested in the query.
847 Check(not query
.has_ancestor(),
848 'ancestor queries on __namespace__ not allowed')
850 key_range
= ParseKeyFilteredQuery(filters
, orders
)
851 key_range
.Remap(_NamespaceKeyToString
)
856 def _NamespaceKeyToString(key
):
857 """Extract namespace name from __namespace__ key.
859 Raises an ApplicationError if the key is not of the form '__namespace__'/name
860 or '__namespace__'/_EMPTY_NAMESPACE_ID.
863 key: a key for a __namespace__ instance.
865 namespace specified by key.
867 key_path
= key
.to_path()
868 if len(key_path
) == 2 and key_path
[0] == '__namespace__':
869 if key_path
[1] == datastore_types
._EMPTY
_NAMESPACE
_ID
:
871 if isinstance(key_path
[1], basestring
):
873 Check(False, 'invalid Key for __namespace__ table')
876 def ParsePropertyQuery(query
, filters
, orders
):
877 """Parse __property__ queries.
879 Raises exceptions for illegal queries.
882 filters: the normalized filters from query.
883 orders: the normalized orders from query.
885 The kind range (a ValueRange over (kind, property) pairs) requested
889 Check(not query
.has_transaction(),
890 'transactional queries on __property__ not allowed')
892 key_range
= ParseKeyFilteredQuery(filters
, orders
)
893 key_range
.Remap(lambda x
: _PropertyKeyToString(x
, ''))
895 if query
.has_ancestor():
896 ancestor
= datastore_types
.Key
._FromPb
(query
.ancestor())
897 ancestor_kind
, ancestor_property
= _PropertyKeyToString(ancestor
, None)
900 if ancestor_property
is not None:
901 key_range
.Update(datastore_pb
.Query_Filter
.EQUAL
,
902 (ancestor_kind
, ancestor_property
))
905 key_range
.Update(datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
,
907 key_range
.Update(datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
,
908 (ancestor_kind
+ '\0', ''))
909 query
.clear_ancestor()
914 def _PropertyKeyToString(key
, default_property
):
915 """Extract property name from __property__ key.
917 Raises an ApplicationError if the key is not of the form
918 '__kind__'/kind, '__property__'/property or '__kind__'/kind
921 key: a key for a __property__ instance.
922 default_property: property value to return when key only has a kind.
924 kind, property if key = '__kind__'/kind, '__property__'/property
925 kind, default_property if key = '__kind__'/kind
927 key_path
= key
.to_path()
928 if (len(key_path
) == 2 and
929 key_path
[0] == '__kind__' and isinstance(key_path
[1], basestring
)):
930 return (key_path
[1], default_property
)
931 if (len(key_path
) == 4 and
932 key_path
[0] == '__kind__' and isinstance(key_path
[1], basestring
) and
933 key_path
[2] == '__property__' and isinstance(key_path
[3], basestring
)):
934 return (key_path
[1], key_path
[3])
936 Check(False, 'invalid Key for __property__ table')
939 def SynthesizeUserId(email
):
940 """Return a synthetic user ID from an email address.
942 Note that this is not the same user ID found in the production system.
945 email: An email address.
948 A string userid derived from the email address.
951 user_id_digest
= _MD5_FUNC(email
.lower()).digest()
952 user_id
= '1' + ''.join(['%02d' % ord(x
) for x
in user_id_digest
])[:20]
956 def FillUsersInQuery(filters
):
957 """Fill in a synthetic user ID for all user properties in a set of filters.
960 filters: The normalized filters from query.
962 for filter in filters
:
963 for property in filter.property_list():
967 def FillUser(property):
968 """Fill in a synthetic user ID for a user properties.
971 property: A Property which may have a user value.
973 if property.value().has_uservalue():
974 uid
= SynthesizeUserId(property.value().uservalue().email())
976 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid
)
979 class BaseCursor(object):
980 """A base query cursor over a list of entities.
983 cursor: the integer cursor.
984 app: the app for which this cursor was created.
985 keys_only: whether the query is keys_only.
988 _next_cursor: the next cursor to allocate.
989 _next_cursor_lock: protects _next_cursor.
992 _next_cursor_lock
= threading
.Lock()
994 def __init__(self
, query
, dsquery
, orders
, index_list
):
998 query: the query request proto.
999 dsquery: a datastore_query.Query over query.
1000 orders: the orders of query as returned by _GuessOrders.
1001 index_list: the list of indexes used by the query.
1004 self
.keys_only
= query
.keys_only()
1005 self
.property_names
= set(query
.property_name_list())
1006 self
.group_by
= set(query
.group_by_property_name_list())
1007 self
.app
= query
.app()
1008 self
.cursor
= self
._AcquireCursorID
()
1010 self
.__order
_compare
_entities
= dsquery
._order
.cmp_for_filter(
1011 dsquery
._filter
_predicate
)
1013 self
.__cursor
_properties
= self
.group_by
1015 self
.__cursor
_properties
= set(order
.property() for order
in orders
)
1016 self
.__cursor
_properties
.add('__key__')
1017 self
.__cursor
_properties
= frozenset(self
.__cursor
_properties
)
1019 self
.__first
_sort
_order
= orders
[0].direction()
1020 self
.__index
_list
= index_list
1022 def _PopulateResultMetadata(self
, query_result
, compile,
1023 first_result
, last_result
):
1024 query_result
.set_keys_only(self
.keys_only
)
1025 if query_result
.more_results():
1026 cursor
= query_result
.mutable_cursor()
1027 cursor
.set_app(self
.app
)
1028 cursor
.set_cursor(self
.cursor
)
1030 self
._EncodeCompiledCursor
(last_result
,
1031 query_result
.mutable_compiled_cursor())
1033 query_result
.index_list().extend(self
.__index
_list
)
1036 def _AcquireCursorID(cls
):
1037 """Acquires the next cursor id in a thread safe manner."""
1038 cls
._next
_cursor
_lock
.acquire()
1040 cursor_id
= cls
._next
_cursor
1041 cls
._next
_cursor
+= 1
1043 cls
._next
_cursor
_lock
.release()
1046 def _IsBeforeCursor(self
, entity
, cursor
):
1047 """True if entity is before cursor according to the current order.
1050 entity: a entity_pb.EntityProto entity.
1051 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1053 comparison_entity
= entity_pb
.EntityProto()
1054 for prop
in entity
.property_list():
1055 if prop
.name() in self
.__cursor
_properties
:
1056 comparison_entity
.add_property().MergeFrom(prop
)
1057 if cursor
[0].has_key():
1058 comparison_entity
.mutable_key().MergeFrom(entity
.key())
1059 x
= self
.__order
_compare
_entities
(comparison_entity
, cursor
[0])
1065 def _DecodeCompiledCursor(self
, compiled_cursor
):
1066 """Converts a compiled_cursor into a cursor_entity.
1069 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1072 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1073 be included in the result set.
1075 assert compiled_cursor
.has_position()
1077 position
= compiled_cursor
.position()
1082 remaining_properties
= set(self
.__cursor
_properties
)
1084 cursor_entity
= entity_pb
.EntityProto()
1085 if position
.has_key():
1086 cursor_entity
.mutable_key().CopyFrom(position
.key())
1088 remaining_properties
.remove('__key__')
1090 Check(False, 'Cursor does not match query: extra value __key__')
1091 for indexvalue
in position
.indexvalue_list():
1092 property = cursor_entity
.add_property()
1093 property.set_name(indexvalue
.property())
1094 property.mutable_value().CopyFrom(indexvalue
.value())
1096 remaining_properties
.remove(indexvalue
.property())
1098 Check(False, 'Cursor does not match query: extra value %s' %
1099 indexvalue
.property())
1100 Check(not remaining_properties
,
1101 'Cursor does not match query: missing values for %r' %
1102 remaining_properties
)
1106 return (cursor_entity
, position
.start_inclusive())
1108 def _EncodeCompiledCursor(self
, last_result
, compiled_cursor
):
1109 """Converts the current state of the cursor into a compiled_cursor.
1112 last_result: the last result returned by this query.
1113 compiled_cursor: an empty datstore_pb.CompiledCursor.
1115 if last_result
is not None:
1118 position
= compiled_cursor
.mutable_position()
1121 if '__key__' in self
.__cursor
_properties
:
1122 position
.mutable_key().MergeFrom(last_result
.key())
1123 for prop
in last_result
.property_list():
1124 if prop
.name() in self
.__cursor
_properties
:
1125 indexvalue
= position
.add_indexvalue()
1126 indexvalue
.set_property(prop
.name())
1127 indexvalue
.mutable_value().CopyFrom(prop
.value())
1128 position
.set_start_inclusive(False)
1129 _SetBeforeAscending(position
, self
.__first
_sort
_order
)
1132 class ListCursor(BaseCursor
):
1133 """A query cursor over a list of entities.
1136 keys_only: whether the query is keys_only
1139 def __init__(self
, query
, dsquery
, orders
, index_list
, results
):
1143 query: the query request proto
1144 dsquery: a datastore_query.Query over query.
1145 orders: the orders of query as returned by _GuessOrders.
1146 index_list: the list of indexes used by the query.
1147 results: list of entity_pb.EntityProto
1149 super(ListCursor
, self
).__init
__(query
, dsquery
, orders
, index_list
)
1155 for result
in results
:
1156 key_value
= _GetGroupByKey(result
, self
.group_by
)
1157 if key_value
not in distincts
:
1158 distincts
.add(key_value
)
1159 new_results
.append(result
)
1160 results
= new_results
1162 if query
.has_compiled_cursor() and query
.compiled_cursor().has_position():
1163 start_cursor
= self
._DecodeCompiledCursor
(query
.compiled_cursor())
1164 self
.__last
_result
= start_cursor
[0]
1165 start_cursor_position
= self
._GetCursorOffset
(results
, start_cursor
)
1167 self
.__last
_result
= None
1168 start_cursor_position
= 0
1170 if query
.has_end_compiled_cursor():
1171 if query
.end_compiled_cursor().has_position():
1172 end_cursor
= self
._DecodeCompiledCursor
(query
.end_compiled_cursor())
1173 end_cursor_position
= self
._GetCursorOffset
(results
, end_cursor
)
1175 end_cursor_position
= 0
1177 end_cursor_position
= len(results
)
1180 results
= results
[start_cursor_position
:end_cursor_position
]
1183 if query
.has_limit():
1184 limit
= query
.limit()
1186 limit
+= query
.offset()
1187 if limit
>= 0 and limit
< len(results
):
1188 results
= results
[:limit
]
1190 self
.__results
= results
1192 self
.__count
= len(self
.__results
)
1194 def _GetCursorOffset(self
, results
, cursor
):
1195 """Converts a cursor into a offset into the result set even if the
1196 cursor's entity no longer exists.
1199 results: the query's results (sequence of entity_pb.EntityProto)
1200 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1207 mid
= (lo
+ hi
) // 2
1208 if self
._IsBeforeCursor
(results
[mid
], cursor
):
1214 def PopulateQueryResult(self
, result
, count
, offset
,
1215 compile=False, first_result
=False):
1216 """Populates a QueryResult with this cursor and the given number of results.
1219 result: datastore_pb.QueryResult
1220 count: integer of how many results to return
1221 offset: integer of how many results to skip
1222 compile: boolean, whether we are compiling this query
1223 first_result: whether the query result is the first for this query
1225 Check(offset
>= 0, 'Offset must be >= 0')
1227 offset
= min(offset
, self
.__count
- self
.__offset
)
1228 limited_offset
= min(offset
, _MAX_QUERY_OFFSET
)
1230 self
.__offset
+= limited_offset
1231 result
.set_skipped_results(limited_offset
)
1233 if compile and result
.skipped_results() > 0:
1234 self
._EncodeCompiledCursor
(self
.__results
[self
.__offset
- 1],
1235 result
.mutable_skipped_results_compiled_cursor())
1236 if offset
== limited_offset
and count
:
1238 if count
> _MAXIMUM_RESULTS
:
1239 count
= _MAXIMUM_RESULTS
1240 results
= self
.__results
[self
.__offset
:self
.__offset
+ count
]
1241 count
= len(results
)
1242 self
.__offset
+= count
1248 result
.result_list().extend(
1249 LoadEntity(entity
, self
.keys_only
, self
.property_names
)
1250 for entity
in results
)
1252 for entity
in results
:
1253 self
._EncodeCompiledCursor
(entity
,
1254 result
.add_result_compiled_cursor())
1258 self
.__last
_result
= self
.__results
[self
.__offset
- 1]
1260 result
.set_more_results(self
.__offset
< self
.__count
)
1261 self
._PopulateResultMetadata
(result
, compile,
1262 first_result
, self
.__last
_result
)
1265 def _SynchronizeTxn(function
):
1266 """A decorator that locks a transaction during the function call."""
1268 def sync(txn
, *args
, **kwargs
):
1273 Check(txn
._state
is LiveTxn
.ACTIVE
, 'transaction closed')
1275 return function(txn
, *args
, **kwargs
)
1282 def _GetEntityGroup(ref
):
1283 """Returns the entity group key for the given reference."""
1284 entity_group
= entity_pb
.Reference()
1285 entity_group
.CopyFrom(ref
)
1286 assert (entity_group
.path().element_list()[0].has_id() or
1287 entity_group
.path().element_list()[0].has_name())
1288 del entity_group
.path().element_list()[1:]
1292 def _GetKeyKind(key
):
1293 """Return the kind of the given key."""
1294 return key
.path().element_list()[-1].type()
1297 def _FilterIndexesByKind(key
, indexes
):
1298 """Return only the indexes with the specified kind."""
1299 return filter((lambda index
:
1300 index
.definition().entity_type() == _GetKeyKind(key
)), indexes
)
1303 class LiveTxn(object):
1304 """An in flight transaction."""
1329 _commit_time_s
= None
1331 def __init__(self
, txn_manager
, app
, allow_multiple_eg
):
1332 assert isinstance(txn_manager
, BaseTransactionManager
)
1333 assert isinstance(app
, basestring
)
1335 self
._txn
_manager
= txn_manager
1337 self
._allow
_multiple
_eg
= allow_multiple_eg
1340 self
._entity
_groups
= {}
1342 self
._lock
= threading
.RLock()
1343 self
._apply
_lock
= threading
.Lock()
1346 self
._cost
= datastore_pb
.Cost()
1352 self
._kind
_to
_indexes
= collections
.defaultdict(list)
1354 def _GetTracker(self
, reference
):
1355 """Gets the entity group tracker for reference.
1357 If this is the first time reference's entity group is seen, creates a new
1358 tracker, checking that the transaction doesn't exceed the entity group
1361 entity_group
= _GetEntityGroup(reference
)
1362 key
= datastore_types
.ReferenceToKeyValue(entity_group
)
1363 tracker
= self
._entity
_groups
.get(key
, None)
1365 Check(self
._app
== reference
.app(),
1366 'Transactions cannot span applications (expected %s, got %s)' %
1367 (self
._app
, reference
.app()))
1368 if self
._allow
_multiple
_eg
:
1369 Check(len(self
._entity
_groups
) < _MAX_EG_PER_TXN
,
1370 'operating on too many entity groups in a single transaction.')
1372 Check(len(self
._entity
_groups
) < 1,
1373 "cross-groups transaction need to be explicitly "
1374 "specified (xg=True)")
1375 tracker
= EntityGroupTracker(entity_group
)
1376 self
._entity
_groups
[key
] = tracker
1380 def _GetAllTrackers(self
):
1381 """Get the trackers for the transaction's entity groups.
1383 If no entity group has been discovered returns a 'global' entity group
1384 tracker. This is possible if the txn only contains transactional tasks.
1387 The tracker list for the entity groups used in this txn.
1389 if not self
._entity
_groups
:
1390 self
._GetTracker
(datastore_types
.Key
.from_path(
1391 '__global__', 1, _app
=self
._app
)._ToPb
())
1392 return self
._entity
_groups
.values()
1394 def _GrabSnapshot(self
, reference
):
1395 """Gets snapshot for this reference, creating it if necessary.
1397 If no snapshot has been set for reference's entity group, a snapshot is
1398 taken and stored for future reads (this also sets the read position),
1399 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1400 a consistent snapshot.
1403 reference: A entity_pb.Reference from which to extract the entity group.
1405 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1407 tracker
= self
._GetTracker
(reference
)
1408 check_contention
= tracker
._snapshot
is None
1409 snapshot
= tracker
._GrabSnapshot
(self
._txn
_manager
)
1410 if check_contention
:
1416 candidates
= [other
for other
in self
._entity
_groups
.values()
1417 if other
._snapshot
is not None and other
!= tracker
]
1418 meta_data_list
= [other
._meta
_data
for other
in candidates
]
1419 self
._txn
_manager
._AcquireWriteLocks
(meta_data_list
)
1421 for other
in candidates
:
1422 if other
._meta
_data
._log
_pos
!= other
._read
_pos
:
1423 self
._state
= self
.FAILED
1424 raise apiproxy_errors
.ApplicationError(
1425 datastore_pb
.Error
.CONCURRENT_TRANSACTION
,
1426 'Concurrency exception.')
1428 self
._txn
_manager
._ReleaseWriteLocks
(meta_data_list
)
1432 def Get(self
, reference
):
1433 """Returns the entity associated with the given entity_pb.Reference or None.
1435 Does not see any modifications in the current txn.
1438 reference: The entity_pb.Reference of the entity to look up.
1441 The associated entity_pb.EntityProto or None if no such entity exists.
1443 snapshot
= self
._GrabSnapshot
(reference
)
1444 entity
= snapshot
.get(datastore_types
.ReferenceToKeyValue(reference
))
1445 return LoadEntity(entity
)
1448 def GetQueryCursor(self
, query
, filters
, orders
, index_list
,
1449 filter_predicate
=None):
1450 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1452 Does not see any modifications in the current txn.
1455 query: The datastore_pb.Query to run.
1456 filters: A list of filters that override the ones found on query.
1457 orders: A list of orders that override the ones found on query.
1458 index_list: A list of indexes used by the query.
1459 filter_predicate: an additional filter of type
1460 datastore_query.FilterPredicate. This is passed along to implement V4
1461 specific filters without changing the entire stub.
1464 A BaseCursor that can be used to fetch query results.
1466 Check(query
.has_ancestor(),
1467 'Query must have an ancestor when performed in a transaction.')
1468 snapshot
= self
._GrabSnapshot
(query
.ancestor())
1469 return _ExecuteQuery(snapshot
.values(), query
, filters
, orders
, index_list
,
1473 def Put(self
, entity
, insert
, indexes
):
1474 """Puts the given entity.
1477 entity: The entity_pb.EntityProto to put.
1478 insert: A boolean that indicates if we should fail if the entity already
1480 indexes: The composite indexes that apply to the entity.
1482 tracker
= self
._GetTracker
(entity
.key())
1483 key
= datastore_types
.ReferenceToKeyValue(entity
.key())
1484 tracker
._delete
.pop(key
, None)
1485 tracker
._put
[key
] = (entity
, insert
)
1486 self
._kind
_to
_indexes
[_GetKeyKind(entity
.key())] = indexes
1489 def Delete(self
, reference
, indexes
):
1490 """Deletes the entity associated with the given reference.
1493 reference: The entity_pb.Reference of the entity to delete.
1494 indexes: The composite indexes that apply to the entity.
1496 tracker
= self
._GetTracker
(reference
)
1497 key
= datastore_types
.ReferenceToKeyValue(reference
)
1498 tracker
._put
.pop(key
, None)
1499 tracker
._delete
[key
] = reference
1500 self
._kind
_to
_indexes
[_GetKeyKind(reference
)] = indexes
1503 def AddActions(self
, actions
, max_actions
=None):
1504 """Adds the given actions to the current txn.
1507 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1508 max_actions: A number that indicates the maximum number of actions to
1511 Check(not max_actions
or len(self
._actions
) + len(actions
) <= max_actions
,
1512 'Too many messages, maximum allowed %s' % max_actions
)
1513 self
._actions
.extend(actions
)
1516 """Rollback the current txn."""
1518 self
._lock
.acquire()
1520 Check(self
._state
is self
.ACTIVE
or self
._state
is self
.FAILED
,
1521 'transaction closed')
1522 self
._state
= self
.ROLLEDBACK
1524 self
._txn
_manager
._RemoveTxn
(self
)
1526 self
._lock
.release()
1530 """Commits the current txn.
1532 This function hands off the responsibility of calling _Apply to the owning
1536 The cost of the transaction.
1540 trackers
= self
._GetAllTrackers
()
1542 for tracker
in trackers
:
1543 snapshot
= tracker
._GrabSnapshot
(self
._txn
_manager
)
1544 empty
= empty
and not tracker
._put
and not tracker
._delete
1547 for entity
, insert
in tracker
._put
.itervalues():
1548 Check(not insert
or self
.Get(entity
.key()) is None,
1549 'the id allocated for a new entity was already '
1550 'in use, please try again')
1553 key
= datastore_types
.ReferenceToKeyValue(entity
.key())
1555 old_entity
= snapshot
[key
]
1556 self
._AddWriteOps
(old_entity
, entity
)
1558 for reference
in tracker
._delete
.itervalues():
1562 key
= datastore_types
.ReferenceToKeyValue(reference
)
1564 old_entity
= snapshot
[key
]
1565 if old_entity
is not None:
1566 self
._AddWriteOps
(None, old_entity
)
1569 if empty
and not self
._actions
:
1571 return datastore_pb
.Cost()
1574 meta_data_list
= [tracker
._meta
_data
for tracker
in trackers
]
1575 self
._txn
_manager
._AcquireWriteLocks
(meta_data_list
)
1583 for tracker
in trackers
:
1584 Check(tracker
._meta
_data
._log
_pos
== tracker
._read
_pos
,
1585 'Concurrency exception.',
1586 datastore_pb
.Error
.CONCURRENT_TRANSACTION
)
1589 for tracker
in trackers
:
1590 tracker
._meta
_data
.Log(self
)
1591 self
._state
= self
.COMMITED
1592 self
._commit
_time
_s
= time
.time()
1599 for action
in self
._actions
:
1601 apiproxy_stub_map
.MakeSyncCall(
1602 'taskqueue', 'Add', action
, api_base_pb
.VoidProto())
1603 except apiproxy_errors
.ApplicationError
, e
:
1604 logging
.warning('Transactional task %s has been dropped, %s',
1608 self
._txn
_manager
._RemoveTxn
(self
)
1610 self
._txn
_manager
._ReleaseWriteLocks
(meta_data_list
)
1613 self
._txn
_manager
._consistency
_policy
._OnCommit
(self
)
1616 def _AddWriteOps(self
, old_entity
, new_entity
):
1617 """Adds the cost of writing the new_entity to the _cost member.
1619 We assume that old_entity represents the current state of the Datastore.
1622 old_entity: Entity representing the current state in the Datstore.
1623 new_entity: Entity representing the desired state in the Datstore.
1625 composite_indexes
= self
._kind
_to
_indexes
[_GetKeyKind(new_entity
.key())]
1626 entity_writes
, index_writes
= _CalculateWriteOps(
1627 composite_indexes
, old_entity
, new_entity
)
1628 _UpdateCost(self
._cost
, entity_writes
, index_writes
)
1630 def _Apply(self
, meta_data
):
1631 """Applies the current txn on the given entity group.
1633 This function blindly performs the operations contained in the current txn.
1634 The calling function must acquire the entity group write lock and ensure
1635 transactions are applied in order.
1638 self
._apply
_lock
.acquire()
1641 assert self
._state
== self
.COMMITED
1642 for tracker
in self
._entity
_groups
.values():
1643 if tracker
._meta
_data
is meta_data
:
1647 assert tracker
._read
_pos
!= tracker
.APPLIED
1650 for entity
, insert
in tracker
._put
.itervalues():
1651 self
._txn
_manager
._Put
(entity
, insert
)
1654 for key
in tracker
._delete
.itervalues():
1655 self
._txn
_manager
._Delete
(key
)
1659 tracker
._read
_pos
= EntityGroupTracker
.APPLIED
1662 tracker
._meta
_data
.Unlog(self
)
1664 self
._apply
_lock
.release()
1667 class EntityGroupTracker(object):
1668 """An entity group involved a transaction."""
1684 def __init__(self
, entity_group
):
1685 self
._entity
_group
= entity_group
1689 def _GrabSnapshot(self
, txn_manager
):
1690 """Snapshot this entity group, remembering the read position."""
1691 if self
._snapshot
is None:
1692 self
._meta
_data
, self
._read
_pos
, self
._snapshot
= (
1693 txn_manager
._GrabSnapshot
(self
._entity
_group
))
1694 return self
._snapshot
1697 class EntityGroupMetaData(object):
1698 """The meta_data assoicated with an entity group."""
1705 def __init__(self
, entity_group
):
1706 self
._entity
_group
= entity_group
1707 self
._write
_lock
= threading
.Lock()
1708 self
._apply
_queue
= []
1711 """Applies all outstanding txns."""
1713 assert self
._write
_lock
.acquire(False) is False
1715 while self
._apply
_queue
:
1716 self
._apply
_queue
[0]._Apply
(self
)
1719 """Add a pending transaction to this entity group.
1721 Requires that the caller hold the meta data lock.
1722 This also increments the current log position and clears the snapshot cache.
1725 assert self
._write
_lock
.acquire(False) is False
1726 self
._apply
_queue
.append(txn
)
1728 self
._snapshot
= None
1730 def Unlog(self
, txn
):
1731 """Remove the first pending transaction from the apply queue.
1733 Requires that the caller hold the meta data lock.
1734 This checks that the first pending transaction is indeed txn.
1737 assert self
._write
_lock
.acquire(False) is False
1739 Check(self
._apply
_queue
and self
._apply
_queue
[0] is txn
,
1740 'Transaction is not appliable',
1741 datastore_pb
.Error
.INTERNAL_ERROR
)
1742 self
._apply
_queue
.pop(0)
1745 class BaseConsistencyPolicy(object):
1746 """A base class for a consistency policy to be used with a transaction manger.
1751 def _OnCommit(self
, txn
):
1752 """Called after a LiveTxn has been commited.
1754 This function can decide whether to apply the txn right away.
1757 txn: A LiveTxn that has been commited
1759 raise NotImplementedError
1761 def _OnGroom(self
, meta_data_list
):
1762 """Called once for every global query.
1764 This function must aqcuire the write lock for any meta data before applying
1765 any outstanding txns.
1768 meta_data_list: A list of EntityGroupMetaData objects.
1770 raise NotImplementedError
1773 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy
):
1774 """Enforces the Master / Slave consistency policy.
1776 Applies all txn on commit.
1779 def _OnCommit(self
, txn
):
1781 for tracker
in txn
._GetAllTrackers
():
1782 tracker
._meta
_data
._write
_lock
.acquire()
1784 tracker
._meta
_data
.CatchUp()
1786 tracker
._meta
_data
._write
_lock
.release()
1791 txn
._txn
_manager
.Write()
1793 def _OnGroom(self
, meta_data_list
):
1799 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy
):
1800 """A base class for High Replication Datastore consistency policies.
1802 All txn are applied asynchronously.
1805 def _OnCommit(self
, txn
):
1808 def _OnGroom(self
, meta_data_list
):
1811 for meta_data
in meta_data_list
:
1812 if not meta_data
._apply
_queue
:
1816 meta_data
._write
_lock
.acquire()
1818 while meta_data
._apply
_queue
:
1819 txn
= meta_data
._apply
_queue
[0]
1820 if self
._ShouldApply
(txn
, meta_data
):
1821 txn
._Apply
(meta_data
)
1825 meta_data
._write
_lock
.release()
1827 def _ShouldApply(self
, txn
, meta_data
):
1828 """Determins if the given transaction should be applied."""
1829 raise NotImplementedError
1832 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy
):
1833 """A High Replication Datastore consiseny policy based on elapsed time.
1835 This class tries to simulate performance seen in the high replication
1836 datastore using estimated probabilities of a transaction commiting after a
1837 given amount of time.
1840 _classification_map
= [(.98, 100),
1846 def SetClassificationMap(self
, classification_map
):
1847 """Set the probability a txn will be applied after a given amount of time.
1850 classification_map: A list of tuples containing (float between 0 and 1,
1851 number of miliseconds) that define the probability of a transaction
1852 applying after a given amount of time.
1854 for prob
, delay
in classification_map
:
1855 if prob
< 0 or prob
> 1 or delay
<= 0:
1857 'classification_map must be a list of (probability, delay) tuples, '
1858 'found %r' % (classification_map
,))
1860 self
._classification
_map
= sorted(classification_map
)
1862 def _ShouldApplyImpl(self
, elapsed_ms
, classification
):
1863 for rate
, ms
in self
._classification
_map
:
1864 if classification
<= rate
:
1866 return elapsed_ms
>= ms
1868 def _Classify(self
, txn
, meta_data
):
1869 return random
.Random(id(txn
) ^
id(meta_data
)).random()
1871 def _ShouldApply(self
, txn
, meta_data
):
1872 elapsed_ms
= (time
.time() - txn
._commit
_time
_s
) * 1000
1873 classification
= self
._Classify
(txn
, meta_data
)
1874 return self
._ShouldApplyImpl
(elapsed_ms
, classification
)
1877 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy
):
1878 """A policy that always gives the same sequence of consistency decisions."""
1880 def __init__(self
, probability
=.5, seed
=0):
1884 probability: A number between 0 and 1 that is the likelihood of a
1885 transaction applying before a global query is executed.
1886 seed: A hashable object to use as a seed. Use None to use the current
1889 self
.SetProbability(probability
)
1892 def SetProbability(self
, probability
):
1893 """Change the probability of a transaction applying.
1896 probability: A number between 0 and 1 that determins the probability of a
1897 transaction applying before a global query is run.
1899 if probability
< 0 or probability
> 1:
1900 raise TypeError('probability must be a number between 0 and 1, found %r' %
1902 self
._probability
= probability
1904 def SetSeed(self
, seed
):
1905 """Reset the seed."""
1906 self
._random
= random
.Random(seed
)
1908 def _ShouldApply(self
, txn
, meta_data
):
1909 return self
._random
.random() < self
._probability
1912 class BaseTransactionManager(object):
1913 """A class that manages the state of transactions.
1915 This includes creating consistent snap shots for transactions.
1918 def __init__(self
, consistency_policy
=None):
1919 super(BaseTransactionManager
, self
).__init
__()
1921 self
._consistency
_policy
= (consistency_policy
or
1922 MasterSlaveConsistencyPolicy())
1925 self
._meta
_data
_lock
= threading
.Lock()
1926 BaseTransactionManager
.Clear(self
)
1928 def SetConsistencyPolicy(self
, policy
):
1929 """Set the consistency to use.
1931 Causes all data to be flushed.
1934 policy: A obj inheriting from BaseConsistencyPolicy.
1936 if not isinstance(policy
, BaseConsistencyPolicy
):
1937 raise TypeError('policy should be of type '
1938 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1941 self
._consistency
_policy
= policy
1944 """Discards any pending transactions and resets the meta data."""
1946 self
._meta
_data
= {}
1950 def BeginTransaction(self
, app
, allow_multiple_eg
):
1951 """Start a transaction on the given app.
1954 app: A string representing the app for which to start the transaction.
1955 allow_multiple_eg: True if transactions can span multiple entity groups.
1958 A datastore_pb.Transaction for the created transaction
1960 Check(not (allow_multiple_eg
and isinstance(
1961 self
._consistency
_policy
, MasterSlaveConsistencyPolicy
)),
1962 'transactions on multiple entity groups only allowed with the '
1963 'High Replication datastore')
1964 txn
= self
._BeginTransaction
(app
, allow_multiple_eg
)
1965 self
._txn
_map
[id(txn
)] = txn
1966 transaction
= datastore_pb
.Transaction()
1967 transaction
.set_app(app
)
1968 transaction
.set_handle(id(txn
))
1971 def GetTxn(self
, transaction
, request_trusted
, request_app
):
1972 """Gets the LiveTxn object associated with the given transaction.
1975 transaction: The datastore_pb.Transaction to look up.
1976 request_trusted: A boolean indicating If the requesting app is trusted.
1977 request_app: A string representing the app making the request.
1980 The associated LiveTxn object.
1982 request_app
= datastore_types
.ResolveAppId(request_app
)
1983 CheckTransaction(request_trusted
, request_app
, transaction
)
1984 txn
= self
._txn
_map
.get(transaction
.handle())
1985 Check(txn
and txn
._app
== transaction
.app(),
1986 'Transaction(<%s>) not found' % str(transaction
).replace('\n', ', '))
1990 """Attempts to apply any outstanding transactions.
1992 The consistency policy determins if a transaction should be applied.
1994 self
._meta
_data
_lock
.acquire()
1996 self
._consistency
_policy
._OnGroom
(self
._meta
_data
.itervalues())
1998 self
._meta
_data
_lock
.release()
2001 """Applies all outstanding transactions."""
2002 self
._meta
_data
_lock
.acquire()
2004 for meta_data
in self
._meta
_data
.itervalues():
2005 if not meta_data
._apply
_queue
:
2009 meta_data
._write
_lock
.acquire()
2013 meta_data
._write
_lock
.release()
2015 self
._meta
_data
_lock
.release()
2017 def _GetMetaData(self
, entity_group
):
2018 """Safely gets the EntityGroupMetaData object for the given entity_group.
2020 self
._meta
_data
_lock
.acquire()
2022 key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2024 meta_data
= self
._meta
_data
.get(key
, None)
2026 meta_data
= EntityGroupMetaData(entity_group
)
2027 self
._meta
_data
[key
] = meta_data
2030 self
._meta
_data
_lock
.release()
2032 def _BeginTransaction(self
, app
, allow_multiple_eg
):
2033 """Starts a transaction without storing it in the txn_map."""
2034 return LiveTxn(self
, app
, allow_multiple_eg
)
2036 def _GrabSnapshot(self
, entity_group
):
2037 """Grabs a consistent snapshot of the given entity group.
2040 entity_group: A entity_pb.Reference of the entity group of which the
2041 snapshot should be taken.
2044 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2045 position and snapshot is a map of reference key value to
2046 entity_pb.EntityProto.
2049 meta_data
= self
._GetMetaData
(entity_group
)
2050 meta_data
._write
_lock
.acquire()
2052 if not meta_data
._snapshot
:
2055 meta_data
._snapshot
= self
._GetEntitiesInEntityGroup
(entity_group
)
2056 return meta_data
, meta_data
._log
_pos
, meta_data
._snapshot
2059 meta_data
._write
_lock
.release()
2061 def _AcquireWriteLocks(self
, meta_data_list
):
2062 """Acquire the write locks for the given entity group meta data.
2064 These locks must be released with _ReleaseWriteLock before returning to the
2068 meta_data_list: list of EntityGroupMetaData objects.
2070 for meta_data
in sorted(meta_data_list
):
2071 meta_data
._write
_lock
.acquire()
2073 def _ReleaseWriteLocks(self
, meta_data_list
):
2074 """Release the write locks of the given entity group meta data.
2077 meta_data_list: list of EntityGroupMetaData objects.
2079 for meta_data
in sorted(meta_data_list
):
2080 meta_data
._write
_lock
.release()
2082 def _RemoveTxn(self
, txn
):
2083 """Removes a LiveTxn from the txn_map (if present)."""
2084 self
._txn
_map
.pop(id(txn
), None)
2086 def _Put(self
, entity
, insert
):
2087 """Put the given entity.
2089 This must be implemented by a sub-class. The sub-class can assume that any
2090 need consistency is enforced at a higher level (and can just put blindly).
2093 entity: The entity_pb.EntityProto to put.
2094 insert: A boolean that indicates if we should fail if the entity already
2097 raise NotImplementedError
2099 def _Delete(self
, reference
):
2100 """Delete the entity associated with the specified reference.
2102 This must be implemented by a sub-class. The sub-class can assume that any
2103 need consistency is enforced at a higher level (and can just delete
2107 reference: The entity_pb.Reference of the entity to delete.
2109 raise NotImplementedError
2111 def _GetEntitiesInEntityGroup(self
, entity_group
):
2112 """Gets the contents of a specific entity group.
2114 This must be implemented by a sub-class. The sub-class can assume that any
2115 need consistency is enforced at a higher level (and can just blindly read).
2117 Other entity groups may be modified concurrently.
2120 entity_group: A entity_pb.Reference of the entity group to get.
2123 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2125 raise NotImplementedError
2128 class BaseIndexManager(object):
2129 """A generic index manager that stores all data in memory."""
2138 WRITE_ONLY
= entity_pb
.CompositeIndex
.WRITE_ONLY
2139 READ_WRITE
= entity_pb
.CompositeIndex
.READ_WRITE
2140 DELETED
= entity_pb
.CompositeIndex
.DELETED
2141 ERROR
= entity_pb
.CompositeIndex
.ERROR
2143 _INDEX_STATE_TRANSITIONS
= {
2144 WRITE_ONLY
: frozenset((READ_WRITE
, DELETED
, ERROR
)),
2145 READ_WRITE
: frozenset((DELETED
,)),
2146 ERROR
: frozenset((DELETED
,)),
2147 DELETED
: frozenset((ERROR
,)),
2154 self
.__indexes
= collections
.defaultdict(list)
2155 self
.__indexes
_lock
= threading
.Lock()
2156 self
.__next
_index
_id
= 1
2157 self
.__index
_id
_lock
= threading
.Lock()
2159 def __FindIndex(self
, index
):
2160 """Finds an existing index by definition.
2163 index: entity_pb.CompositeIndex
2166 entity_pb.CompositeIndex, if it exists; otherwise None
2168 app
= index
.app_id()
2169 if app
in self
.__indexes
:
2170 for stored_index
in self
.__indexes
[app
]:
2171 if index
.definition() == stored_index
.definition():
2176 def CreateIndex(self
, index
, trusted
=False, calling_app
=None):
2177 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2178 CheckAppId(trusted
, calling_app
, index
.app_id())
2179 Check(index
.id() == 0, 'New index id must be 0.')
2180 Check(not self
.__FindIndex
(index
), 'Index already exists.')
2183 self
.__index
_id
_lock
.acquire()
2184 index
.set_id(self
.__next
_index
_id
)
2185 self
.__next
_index
_id
+= 1
2186 self
.__index
_id
_lock
.release()
2189 clone
= entity_pb
.CompositeIndex()
2190 clone
.CopyFrom(index
)
2191 app
= index
.app_id()
2192 clone
.set_app_id(app
)
2195 self
.__indexes
_lock
.acquire()
2197 self
.__indexes
[app
].append(clone
)
2199 self
.__indexes
_lock
.release()
2201 self
._OnIndexChange
(index
.app_id())
2205 def GetIndexes(self
, app
, trusted
=False, calling_app
=None):
2206 """Get the CompositeIndex objects for the given app."""
2207 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2208 CheckAppId(trusted
, calling_app
, app
)
2210 return self
.__indexes
[app
]
2212 def UpdateIndex(self
, index
, trusted
=False, calling_app
=None):
2213 CheckAppId(trusted
, calling_app
, index
.app_id())
2215 stored_index
= self
.__FindIndex
(index
)
2216 Check(stored_index
, 'Index does not exist.')
2217 Check(index
.state() == stored_index
.state() or
2218 index
.state() in self
._INDEX
_STATE
_TRANSITIONS
[stored_index
.state()],
2219 'cannot move index state from %s to %s' %
2220 (entity_pb
.CompositeIndex
.State_Name(stored_index
.state()),
2221 (entity_pb
.CompositeIndex
.State_Name(index
.state()))))
2224 self
.__indexes
_lock
.acquire()
2226 stored_index
.set_state(index
.state())
2228 self
.__indexes
_lock
.release()
2230 self
._OnIndexChange
(index
.app_id())
2232 def DeleteIndex(self
, index
, trusted
=False, calling_app
=None):
2233 CheckAppId(trusted
, calling_app
, index
.app_id())
2235 stored_index
= self
.__FindIndex
(index
)
2236 Check(stored_index
, 'Index does not exist.')
2239 app
= index
.app_id()
2240 self
.__indexes
_lock
.acquire()
2242 self
.__indexes
[app
].remove(stored_index
)
2244 self
.__indexes
_lock
.release()
2246 self
._OnIndexChange
(index
.app_id())
2248 def _SideLoadIndex(self
, index
):
2249 self
.__indexes
[index
.app()].append(index
)
2251 def _OnIndexChange(self
, app_id
):
2255 class BaseDatastore(BaseTransactionManager
, BaseIndexManager
):
2256 """A base implemenation of a Datastore.
2258 This class implements common functions associated with a datastore and
2259 enforces security restrictions passed on by a stub or client. It is designed
2260 to be shared by any number of threads or clients serving any number of apps.
2262 If an app is not specified explicitly it is pulled from the env and assumed to
2268 _MAX_QUERY_COMPONENTS
= 100
2276 _MAX_ACTIONS_PER_TXN
= 5
2278 def __init__(self
, require_indexes
=False, consistency_policy
=None,
2279 use_atexit
=True, auto_id_policy
=SEQUENTIAL
):
2280 BaseTransactionManager
.__init
__(self
, consistency_policy
=consistency_policy
)
2281 BaseIndexManager
.__init
__(self
)
2283 self
._require
_indexes
= require_indexes
2284 self
._pseudo
_kinds
= {}
2285 self
.SetAutoIdPolicy(auto_id_policy
)
2292 atexit
.register(self
.Write
)
2295 """Clears out all stored values."""
2297 BaseTransactionManager
.Clear(self
)
2300 def _RegisterPseudoKind(self
, kind
):
2301 """Registers a pseudo kind to be used to satisfy a meta data query."""
2302 self
._pseudo
_kinds
[kind
.name
] = kind
2303 kind
._stub
= weakref
.proxy(self
)
2308 def GetQueryCursor(self
, raw_query
, trusted
=False, calling_app
=None,
2309 filter_predicate
=None):
2313 raw_query: The non-validated datastore_pb.Query to run.
2314 trusted: If the calling app is trusted.
2315 calling_app: The app requesting the results or None to pull the app from
2317 filter_predicate: an additional filter of type
2318 datastore_query.FilterPredicate. This is passed along to implement V4
2319 specific filters without changing the entire stub.
2322 A BaseCursor that can be used to retrieve results.
2325 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2326 CheckAppId(trusted
, calling_app
, raw_query
.app())
2329 filters
, orders
= datastore_index
.Normalize(raw_query
.filter_list(),
2330 raw_query
.order_list(),
2331 raw_query
.property_name_list())
2334 CheckQuery(raw_query
, filters
, orders
, self
._MAX
_QUERY
_COMPONENTS
)
2335 FillUsersInQuery(filters
)
2341 if filter_predicate
is None:
2342 self
._CheckHasIndex
(raw_query
, trusted
, calling_app
)
2345 index_list
= self
.__IndexListForQuery
(raw_query
)
2348 if raw_query
.has_transaction():
2350 Check(raw_query
.kind() not in self
._pseudo
_kinds
,
2351 'transactional queries on "%s" not allowed' % raw_query
.kind())
2352 txn
= self
.GetTxn(raw_query
.transaction(), trusted
, calling_app
)
2353 return txn
.GetQueryCursor(raw_query
, filters
, orders
, index_list
)
2355 if raw_query
.has_ancestor() and raw_query
.kind() not in self
._pseudo
_kinds
:
2357 txn
= self
._BeginTransaction
(raw_query
.app(), False)
2358 return txn
.GetQueryCursor(raw_query
, filters
, orders
, index_list
,
2363 return self
._GetQueryCursor
(raw_query
, filters
, orders
, index_list
,
2366 def __IndexListForQuery(self
, query
):
2367 """Get the single composite index pb used by the query, if any, as a list.
2370 query: the datastore_pb.Query to compute the index list for
2373 A singleton list of the composite index pb used by the query,
2376 required
, kind
, ancestor
, props
= (
2377 datastore_index
.CompositeIndexForQuery(query
))
2380 composite_index_pb
= entity_pb
.CompositeIndex()
2381 composite_index_pb
.set_app_id(query
.app())
2382 composite_index_pb
.set_id(0)
2383 composite_index_pb
.set_state(entity_pb
.CompositeIndex
.READ_WRITE
)
2384 index_pb
= composite_index_pb
.mutable_definition()
2385 index_pb
.set_entity_type(kind
)
2386 index_pb
.set_ancestor(bool(ancestor
))
2387 for name
, direction
in datastore_index
.GetRecommendedIndexProperties(props
):
2388 prop_pb
= entity_pb
.Index_Property()
2389 prop_pb
.set_name(name
)
2390 prop_pb
.set_direction(direction
)
2391 index_pb
.property_list().append(prop_pb
)
2392 return [composite_index_pb
]
2394 def Get(self
, raw_keys
, transaction
=None, eventual_consistency
=False,
2395 trusted
=False, calling_app
=None):
2396 """Get the entities for the given keys.
2399 raw_keys: A list of unverified entity_pb.Reference objects.
2400 transaction: The datastore_pb.Transaction to use or None.
2401 eventual_consistency: If we should allow stale, potentially inconsistent
2403 trusted: If the calling app is trusted.
2404 calling_app: The app requesting the results or None to pull the app from
2408 A list containing the entity or None if no entity exists.
2414 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2416 if not transaction
and eventual_consistency
:
2419 for key
in raw_keys
:
2420 CheckReference(calling_app
, trusted
, key
)
2421 result
.append(self
._GetWithPseudoKinds
(None, key
))
2427 grouped_keys
= collections
.defaultdict(list)
2428 for i
, key
in enumerate(raw_keys
):
2429 CheckReference(trusted
, calling_app
, key
)
2430 entity_group
= _GetEntityGroup(key
)
2431 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2432 grouped_keys
[entity_group_key
].append((key
, i
))
2436 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2437 return [self
._GetWithPseudoKinds
(txn
, key
) for key
in raw_keys
]
2441 result
= [None] * len(raw_keys
)
2445 result
[i
] = self
._GetWithPseudoKinds
(txn
, key
)
2446 for keys
in grouped_keys
.itervalues():
2447 self
._RunInTxn
(keys
, keys
[0][0].app(), op
)
2450 def _GetWithPseudoKinds(self
, txn
, key
):
2451 """Fetch entity key in txn, taking account of pseudo-kinds."""
2452 pseudo_kind
= self
._pseudo
_kinds
.get(_GetKeyKind(key
), None)
2454 return pseudo_kind
.Get(txn
, key
)
2458 return self
._Get
(key
)
2460 def Put(self
, raw_entities
, cost
, transaction
=None,
2461 trusted
=False, calling_app
=None):
2462 """Writes the given given entities.
2464 Updates an entity's key and entity_group in place if needed
2467 raw_entities: A list of unverified entity_pb.EntityProto objects.
2468 cost: Out param. The cost of putting the provided entities.
2469 transaction: The datastore_pb.Transaction to use or None.
2470 trusted: If the calling app is trusted.
2471 calling_app: The app requesting the results or None to pull the app from
2474 A list of entity_pb.Reference objects that indicates where each entity
2477 if not raw_entities
:
2480 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2483 result
= [None] * len(raw_entities
)
2484 grouped_entities
= collections
.defaultdict(list)
2485 for i
, raw_entity
in enumerate(raw_entities
):
2486 CheckEntity(trusted
, calling_app
, raw_entity
)
2490 entity
= entity_pb
.EntityProto()
2491 entity
.CopyFrom(raw_entity
)
2494 for prop
in itertools
.chain(entity
.property_list(),
2495 entity
.raw_property_list()):
2498 last_element
= entity
.key().path().element_list()[-1]
2499 if not (last_element
.id() or last_element
.has_name()):
2503 if self
._auto
_id
_policy
== SEQUENTIAL
:
2504 last_element
.set_id(self
._AllocateSequentialIds
(entity
.key())[0])
2506 full_key
= self
._AllocateIds
([entity
.key()])[0]
2507 last_element
.set_id(full_key
.path().element_list()[-1].id())
2511 entity_group
= _GetEntityGroup(entity
.key())
2512 entity
.mutable_entity_group().CopyFrom(entity_group
.path())
2513 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2514 grouped_entities
[entity_group_key
].append((entity
, insert
))
2518 key
= entity_pb
.Reference()
2519 key
.CopyFrom(entity
.key())
2524 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2525 for group
in grouped_entities
.values():
2526 for entity
, insert
in group
:
2528 indexes
= _FilterIndexesByKind(entity
.key(), self
.GetIndexes(
2529 entity
.key().app(), trusted
, calling_app
))
2530 txn
.Put(entity
, insert
, indexes
)
2533 for entities
in grouped_entities
.itervalues():
2534 txn_cost
= self
._RunInTxn
(
2535 entities
, entities
[0][0].key().app(),
2537 lambda txn
, v
: txn
.Put(v
[0], v
[1], _FilterIndexesByKind(
2539 self
.GetIndexes(v
[0].key().app(), trusted
, calling_app
))))
2540 _UpdateCost(cost
, txn_cost
.entity_writes(), txn_cost
.index_writes())
2543 def Delete(self
, raw_keys
, cost
, transaction
=None,
2544 trusted
=False, calling_app
=None):
2545 """Deletes the entities associated with the given keys.
2548 raw_keys: A list of unverified entity_pb.Reference objects.
2549 cost: Out param. The cost of putting the provided entities.
2550 transaction: The datastore_pb.Transaction to use or None.
2551 trusted: If the calling app is trusted.
2552 calling_app: The app requesting the results or None to pull the app from
2558 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2561 grouped_keys
= collections
.defaultdict(list)
2562 for key
in raw_keys
:
2563 CheckReference(trusted
, calling_app
, key
)
2564 entity_group
= _GetEntityGroup(key
)
2565 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2566 grouped_keys
[entity_group_key
].append(key
)
2570 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2571 for key
in raw_keys
:
2573 indexes
= _FilterIndexesByKind(key
, self
.GetIndexes(
2574 key
.app(), trusted
, calling_app
))
2575 txn
.Delete(key
, indexes
)
2578 for keys
in grouped_keys
.itervalues():
2580 txn_cost
= self
._RunInTxn
(
2581 keys
, keys
[0].app(),
2582 lambda txn
, key
: txn
.Delete(key
, _FilterIndexesByKind(
2583 key
, self
.GetIndexes(key
.app(), trusted
, calling_app
))))
2584 _UpdateCost(cost
, txn_cost
.entity_writes(), txn_cost
.index_writes())
2586 def Touch(self
, raw_keys
, trusted
=False, calling_app
=None):
2587 """Applies all outstanding writes."""
2588 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2590 grouped_keys
= collections
.defaultdict(list)
2591 for key
in raw_keys
:
2592 CheckReference(trusted
, calling_app
, key
)
2593 entity_group
= _GetEntityGroup(key
)
2594 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2595 grouped_keys
[entity_group_key
].append(key
)
2597 for keys
in grouped_keys
.itervalues():
2598 self
._RunInTxn
(keys
, keys
[0].app(), lambda txn
, key
: None)
2600 def _RunInTxn(self
, values
, app
, op
):
2601 """Runs the given values in a separate Txn.
2603 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2606 values: A list of arguments to op.
2607 app: The app to create the Txn on.
2608 op: A function to run on each value in the Txn.
2611 The cost of the txn.
2614 backoff
= _INITIAL_RETRY_DELAY_MS
/ 1000.0
2617 txn
= self
._BeginTransaction
(app
, False)
2618 for value
in values
:
2621 except apiproxy_errors
.ApplicationError
, e
:
2622 if e
.application_error
== datastore_pb
.Error
.CONCURRENT_TRANSACTION
:
2625 if retries
<= _RETRIES
:
2627 backoff
*= _RETRY_DELAY_MULTIPLIER
2628 if backoff
* 1000.0 > _MAX_RETRY_DELAY_MS
:
2629 backoff
= _MAX_RETRY_DELAY_MS
/ 1000.0
2633 def _CheckHasIndex(self
, query
, trusted
=False, calling_app
=None):
2634 """Checks if the query can be satisfied given the existing indexes.
2637 query: the datastore_pb.Query to check
2638 trusted: True if the calling app is trusted (like dev_admin_console)
2639 calling_app: app_id of the current running application
2641 if query
.kind() in self
._pseudo
_kinds
or not self
._require
_indexes
:
2644 minimal_index
= datastore_index
.MinimalCompositeIndexForQuery(query
,
2645 (datastore_index
.ProtoToIndexDefinition(index
)
2646 for index
in self
.GetIndexes(query
.app(), trusted
, calling_app
)
2647 if index
.state() == entity_pb
.CompositeIndex
.READ_WRITE
))
2648 if minimal_index
is not None:
2649 msg
= ('This query requires a composite index that is not defined. '
2650 'You must update the index.yaml file in your application root.')
2651 is_most_efficient
, kind
, ancestor
, properties
= minimal_index
2652 if not is_most_efficient
:
2654 yaml
= datastore_index
.IndexYamlForQuery(kind
, ancestor
,
2655 datastore_index
.GetRecommendedIndexProperties(properties
))
2656 msg
+= '\nThe following index is the minimum index required:\n' + yaml
2657 raise apiproxy_errors
.ApplicationError(datastore_pb
.Error
.NEED_INDEX
, msg
)
2659 def SetAutoIdPolicy(self
, auto_id_policy
):
2660 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2662 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2663 and the default will be SCATTERED.
2666 auto_id_policy: string constant.
2668 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2670 valid_policies
= (SEQUENTIAL
, SCATTERED
)
2671 if auto_id_policy
not in valid_policies
:
2672 raise TypeError('auto_id_policy must be in %s, found %s instead',
2673 valid_policies
, auto_id_policy
)
2674 self
._auto
_id
_policy
= auto_id_policy
2679 """Writes the datastore to disk."""
2682 def _GetQueryCursor(self
, query
, filters
, orders
, index_list
,
2684 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2686 This must be implemented by a sub-class. The sub-class does not need to
2687 enforced any consistency guarantees (and can just blindly read).
2690 query: The datastore_pb.Query to run.
2691 filters: A list of filters that override the ones found on query.
2692 orders: A list of orders that override the ones found on query.
2693 index_list: A list of indexes used by the query.
2694 filter_predicate: an additional filter of type
2695 datastore_query.FilterPredicate. This is passed along to implement V4
2696 specific filters without changing the entire stub.
2699 A BaseCursor that can be used to fetch query results.
2701 raise NotImplementedError
2703 def _Get(self
, reference
):
2704 """Get the entity for the given reference or None.
2706 This must be implemented by a sub-class. The sub-class does not need to
2707 enforced any consistency guarantees (and can just blindly read).
2710 reference: A entity_pb.Reference to loop up.
2713 The entity_pb.EntityProto associated with the given reference or None.
2715 raise NotImplementedError
2717 def _AllocateSequentialIds(self
, reference
, size
=1, max_id
=None):
2718 """Allocate sequential ids for given reference.
2721 reference: An entity_pb.Reference to allocate an id for.
2722 size: The size of the range to allocate
2723 max_id: The upper bound of the range to allocate
2726 A tuple containing (min, max) of the allocated range.
2728 raise NotImplementedError
2730 def _AllocateIds(self
, references
):
2731 """Allocate or reserves IDs for the v4 datastore API.
2733 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2734 their paths reserved in the appropriate ID space.
2737 references: a list of entity_pb.Reference objects to allocate or reserve
2740 a list of complete entity_pb.Reference objects corresponding to the
2741 incomplete keys in the input, with newly allocated ids.
2743 raise NotImplementedError
2746 def _NeedsIndexes(func
):
2747 """A decorator for DatastoreStub methods that require or affect indexes.
2749 Updates indexes to match index.yaml before the call and updates index.yaml
2750 after the call if require_indexes is False. If root_path is not set, this is a
2754 def UpdateIndexesWrapper(self
, *args
, **kwargs
):
2755 self
._SetupIndexes
()
2757 return func(self
, *args
, **kwargs
)
2759 self
._UpdateIndexes
()
2761 return UpdateIndexesWrapper
2764 class EntityGroupPseudoKind(object):
2765 """A common implementation of get() for the __entity_group__ pseudo-kind.
2768 name: the pseudo-kind name
2770 name
= '__entity_group__'
2780 base_version
= int(time
.time() * 1e6
)
2782 def Get(self
, txn
, key
):
2783 """Fetch key of this pseudo-kind within txn.
2786 txn: transaction within which Get occurs, may be None if this is an
2787 eventually consistent Get.
2788 key: key of pseudo-entity to Get.
2791 An entity for key, or None if it doesn't exist.
2795 txn
= self
._stub
._BeginTransaction
(key
.app(), False)
2797 return self
.Get(txn
, key
)
2802 if isinstance(txn
._txn
_manager
._consistency
_policy
,
2803 MasterSlaveConsistencyPolicy
):
2812 if path
.element_size() != 2 or path
.element_list()[-1].id() != 1:
2815 tracker
= txn
._GetTracker
(key
)
2816 tracker
._GrabSnapshot
(txn
._txn
_manager
)
2818 eg
= entity_pb
.EntityProto()
2819 eg
.mutable_key().CopyFrom(key
)
2820 eg
.mutable_entity_group().CopyFrom(_GetEntityGroup(key
).path())
2821 version
= entity_pb
.Property()
2822 version
.set_name('__version__')
2823 version
.set_multiple(False)
2824 version
.mutable_value().set_int64value(
2825 tracker
._read
_pos
+ self
.base_version
)
2826 eg
.property_list().append(version
)
2829 def Query(self
, query
, filters
, orders
):
2830 """Perform a query on this pseudo-kind.
2833 query: the original datastore_pb.Query.
2834 filters: the filters from query.
2835 orders: the orders from query.
2838 always raises an error
2842 raise apiproxy_errors
.ApplicationError(
2843 datastore_pb
.Error
.BAD_REQUEST
, 'queries not supported on ' + self
.name
)
2846 class DatastoreStub(object):
2847 """A stub that maps datastore service calls on to a BaseDatastore.
2849 This class also keeps track of query cursors.
2857 super(DatastoreStub
, self
).__init
__()
2858 self
._datastore
= datastore
2859 self
._app
_id
= datastore_types
.ResolveAppId(app_id
)
2860 self
._trusted
= trusted
2861 self
._root
_path
= root_path
2864 self
.__query
_history
= {}
2867 self
.__query
_ci
_history
= set()
2871 self
._cached
_yaml
= (None, None, None)
2873 if self
._require
_indexes
or root_path
is None:
2875 self
._index
_yaml
_updater
= None
2878 self
._index
_yaml
_updater
= datastore_stub_index
.IndexYamlUpdater(
2881 DatastoreStub
.Clear(self
)
2884 """Clears out all stored values."""
2885 self
._query
_cursors
= {}
2886 self
.__query
_history
= {}
2887 self
.__query
_ci
_history
= set()
2889 def QueryHistory(self
):
2890 """Returns a dict that maps Query PBs to times they've been run."""
2892 return dict((pb
, times
) for pb
, times
in self
.__query
_history
.items()
2893 if pb
.app() == self
._app
_id
)
2895 def _QueryCompositeIndexHistoryLength(self
):
2896 """Returns the length of the CompositeIndex set for query history."""
2897 return len(self
.__query
_ci
_history
)
2899 def SetTrusted(self
, trusted
):
2900 """Set/clear the trusted bit in the stub.
2902 This bit indicates that the app calling the stub is trusted. A
2903 trusted app can write to datastores of other apps.
2908 self
._trusted
= trusted
2912 def _Dynamic_Get(self
, req
, res
):
2915 transaction
= req
.has_transaction() and req
.transaction() or None
2918 if req
.allow_deferred() and req
.key_size() > _MAXIMUM_RESULTS
:
2922 keys_to_get
= req
.key_list()[-_MAXIMUM_RESULTS
:]
2923 deferred_keys
= req
.key_list()[:-_MAXIMUM_RESULTS
]
2924 res
.deferred_list().extend(deferred_keys
)
2927 keys_to_get
= req
.key_list()
2929 res
.set_in_order(not req
.allow_deferred())
2931 total_response_bytes
= 0
2932 for index
, entity
in enumerate(self
._datastore
.Get(keys_to_get
,
2934 req
.has_failover_ms(),
2937 entity_size
= entity
and entity
.ByteSize() or 0
2940 if (req
.allow_deferred()
2942 and total_response_bytes
+ entity_size
> _MAXIMUM_QUERY_RESULT_BYTES
):
2944 res
.deferred_list().extend(keys_to_get
[index
:])
2947 entity_result
= res
.add_entity()
2948 entity_result
.mutable_entity().CopyFrom(entity
)
2949 total_response_bytes
+= entity_size
2952 entity_result
= res
.add_entity()
2953 entity_result
.mutable_key().CopyFrom(keys_to_get
[index
])
2955 def _Dynamic_Put(self
, req
, res
):
2956 transaction
= req
.has_transaction() and req
.transaction() or None
2957 res
.key_list().extend(self
._datastore
.Put(req
.entity_list(),
2960 self
._trusted
, self
._app
_id
))
2962 def _Dynamic_Delete(self
, req
, res
):
2963 transaction
= req
.has_transaction() and req
.transaction() or None
2964 self
._datastore
.Delete(req
.key_list(), res
.mutable_cost(), transaction
,
2965 self
._trusted
, self
._app
_id
)
2967 def _Dynamic_Touch(self
, req
, _
):
2968 self
._datastore
.Touch(req
.key_list(), self
._trusted
, self
._app
_id
)
2971 def _Dynamic_RunQuery(self
, query
, query_result
, filter_predicate
=None):
2972 self
.__UpgradeCursors
(query
)
2973 cursor
= self
._datastore
.GetQueryCursor(query
, self
._trusted
, self
._app
_id
,
2976 if query
.has_count():
2977 count
= query
.count()
2978 elif query
.has_limit():
2979 count
= query
.limit()
2981 count
= self
._BATCH
_SIZE
2983 cursor
.PopulateQueryResult(query_result
, count
, query
.offset(),
2984 query
.compile(), first_result
=True)
2985 if query_result
.has_cursor():
2986 self
._query
_cursors
[query_result
.cursor().cursor()] = cursor
2992 compiled_query
= query_result
.mutable_compiled_query()
2993 compiled_query
.set_keys_only(query
.keys_only())
2994 compiled_query
.mutable_primaryscan().set_index_name(query
.Encode())
2995 self
.__UpdateQueryHistory
(query
)
2997 def __UpgradeCursors(self
, query
):
2998 """Upgrades compiled cursors in place.
3000 If the cursor position does not specify before_ascending, populate it.
3001 If before_ascending is already populated, use it and the sort direction
3002 from the query to set an appropriate value for start_inclusive.
3005 query: datastore_pb.Query
3007 first_sort_direction
= None
3008 if query
.order_list():
3009 first_sort_direction
= query
.order(0).direction()
3011 for compiled_cursor
in [query
.compiled_cursor(),
3012 query
.end_compiled_cursor()]:
3013 self
.__UpgradeCursor
(compiled_cursor
, first_sort_direction
)
3015 def __UpgradeCursor(self
, compiled_cursor
, first_sort_direction
):
3016 """Upgrades a compiled cursor in place.
3018 If the cursor position does not specify before_ascending, populate it.
3019 If before_ascending is already populated, use it and the provided direction
3020 to set an appropriate value for start_inclusive.
3023 compiled_cursor: datastore_pb.CompiledCursor
3024 first_sort_direction: first sort direction from the query or None
3028 if not self
.__IsPlannable
(compiled_cursor
):
3030 elif compiled_cursor
.position().has_before_ascending():
3031 _SetStartInclusive(compiled_cursor
.position(), first_sort_direction
)
3032 elif compiled_cursor
.position().has_start_inclusive():
3033 _SetBeforeAscending(compiled_cursor
.position(), first_sort_direction
)
3035 def __IsPlannable(self
, compiled_cursor
):
3036 """Returns True if compiled_cursor is plannable.
3039 compiled_cursor: datastore_pb.CompiledCursor
3041 position
= compiled_cursor
.position()
3042 return position
.has_key() or position
.indexvalue_list()
3044 def __UpdateQueryHistory(self
, query
):
3046 clone
= datastore_pb
.Query()
3047 clone
.CopyFrom(query
)
3050 clone
.clear_offset()
3052 if clone
in self
.__query
_history
:
3053 self
.__query
_history
[clone
] += 1
3055 self
.__query
_history
[clone
] = 1
3056 if clone
.app() == self
._app
_id
:
3057 self
.__query
_ci
_history
.add(
3058 datastore_index
.CompositeIndexForQuery(clone
))
3060 def _Dynamic_Next(self
, next_request
, query_result
):
3061 app
= next_request
.cursor().app()
3062 CheckAppId(self
._trusted
, self
._app
_id
, app
)
3064 cursor
= self
._query
_cursors
.get(next_request
.cursor().cursor())
3065 Check(cursor
and cursor
.app
== app
,
3066 'Cursor %d not found' % next_request
.cursor().cursor())
3068 count
= self
._BATCH
_SIZE
3069 if next_request
.has_count():
3070 count
= next_request
.count()
3072 cursor
.PopulateQueryResult(query_result
, count
, next_request
.offset(),
3073 next_request
.compile(), first_result
=False)
3075 if not query_result
.has_cursor():
3076 del self
._query
_cursors
[next_request
.cursor().cursor()]
3078 def _Dynamic_AddActions(self
, request
, _
):
3079 """Associates the creation of one or more tasks with a transaction.
3082 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3083 tasks that should be created when the transaction is committed.
3089 if not request
.add_request_list():
3092 transaction
= request
.add_request_list()[0].transaction()
3093 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3095 for add_request
in request
.add_request_list():
3099 Check(add_request
.transaction() == transaction
,
3100 'Cannot add requests to different transactions')
3101 clone
= taskqueue_service_pb
.TaskQueueAddRequest()
3102 clone
.CopyFrom(add_request
)
3103 clone
.clear_transaction()
3104 new_actions
.append(clone
)
3106 txn
.AddActions(new_actions
, self
._MAX
_ACTIONS
_PER
_TXN
)
3108 def _Dynamic_BeginTransaction(self
, req
, transaction
):
3109 CheckAppId(self
._trusted
, self
._app
_id
, req
.app())
3110 transaction
.CopyFrom(self
._datastore
.BeginTransaction(
3111 req
.app(), req
.allow_multiple_eg()))
3113 def _Dynamic_Commit(self
, transaction
, res
):
3114 CheckAppId(self
._trusted
, self
._app
_id
, transaction
.app())
3115 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3116 res
.mutable_cost().CopyFrom(txn
.Commit())
3118 def _Dynamic_Rollback(self
, transaction
, _
):
3119 CheckAppId(self
._trusted
, self
._app
_id
, transaction
.app())
3120 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3123 def _Dynamic_CreateIndex(self
, index
, id_response
):
3124 id_response
.set_value(self
._datastore
.CreateIndex(index
,
3129 def _Dynamic_GetIndices(self
, app_str
, composite_indices
):
3130 composite_indices
.index_list().extend(self
._datastore
.GetIndexes(
3131 app_str
.value(), self
._trusted
, self
._app
_id
))
3133 def _Dynamic_UpdateIndex(self
, index
, _
):
3134 self
._datastore
.UpdateIndex(index
, self
._trusted
, self
._app
_id
)
3136 def _Dynamic_DeleteIndex(self
, index
, _
):
3137 self
._datastore
.DeleteIndex(index
, self
._trusted
, self
._app
_id
)
3139 def _Dynamic_AllocateIds(self
, allocate_ids_request
, allocate_ids_response
):
3140 Check(not allocate_ids_request
.has_model_key()
3141 or not allocate_ids_request
.reserve_list(),
3142 'Cannot allocate and reserve IDs in the same request')
3143 if allocate_ids_request
.reserve_list():
3144 Check(not allocate_ids_request
.has_size(),
3145 'Cannot specify size when reserving IDs')
3146 Check(not allocate_ids_request
.has_max(),
3147 'Cannot specify max when reserving IDs')
3149 if allocate_ids_request
.has_model_key():
3150 CheckAppId(allocate_ids_request
.model_key().app(),
3151 self
._trusted
, self
._app
_id
)
3153 reference
= allocate_ids_request
.model_key()
3155 (start
, end
) = self
._datastore
._AllocateSequentialIds
(
3156 reference
, allocate_ids_request
.size(), allocate_ids_request
.max())
3158 allocate_ids_response
.set_start(start
)
3159 allocate_ids_response
.set_end(end
)
3161 for reference
in allocate_ids_request
.reserve_list():
3162 CheckAppId(reference
.app(), self
._trusted
, self
._app
_id
)
3163 self
._datastore
._AllocateIds
(allocate_ids_request
.reserve_list())
3164 allocate_ids_response
.set_start(0)
3165 allocate_ids_response
.set_end(0)
3167 def _SetupIndexes(self
, _open
=open):
3168 """Ensure that the set of existing composite indexes matches index.yaml.
3170 Note: this is similar to the algorithm used by the admin console for
3176 if not self
._root
_path
:
3178 index_yaml_file
= os
.path
.join(self
._root
_path
, 'index.yaml')
3179 if (self
._cached
_yaml
[0] == index_yaml_file
and
3180 os
.path
.exists(index_yaml_file
) and
3181 os
.path
.getmtime(index_yaml_file
) == self
._cached
_yaml
[1]):
3182 requested_indexes
= self
._cached
_yaml
[2]
3185 index_yaml_mtime
= os
.path
.getmtime(index_yaml_file
)
3186 fh
= _open(index_yaml_file
, 'r')
3187 except (OSError, IOError):
3188 index_yaml_data
= None
3191 index_yaml_data
= fh
.read()
3195 requested_indexes
= []
3196 if index_yaml_data
is not None:
3198 index_defs
= datastore_index
.ParseIndexDefinitions(index_yaml_data
)
3199 if index_defs
is not None and index_defs
.indexes
is not None:
3201 requested_indexes
= datastore_index
.IndexDefinitionsToProtos(
3204 self
._cached
_yaml
= (index_yaml_file
, index_yaml_mtime
,
3208 existing_indexes
= self
._datastore
.GetIndexes(
3209 self
._app
_id
, self
._trusted
, self
._app
_id
)
3212 requested
= dict((x
.definition().Encode(), x
) for x
in requested_indexes
)
3213 existing
= dict((x
.definition().Encode(), x
) for x
in existing_indexes
)
3217 for key
, index
in requested
.iteritems():
3218 if key
not in existing
:
3219 new_index
= entity_pb
.CompositeIndex()
3220 new_index
.CopyFrom(index
)
3221 new_index
.set_id(datastore_admin
.CreateIndex(new_index
))
3222 new_index
.set_state(entity_pb
.CompositeIndex
.READ_WRITE
)
3223 datastore_admin
.UpdateIndex(new_index
)
3228 for key
, index
in existing
.iteritems():
3229 if key
not in requested
:
3230 datastore_admin
.DeleteIndex(index
)
3234 if created
or deleted
:
3235 logging
.debug('Created %d and deleted %d index(es); total %d',
3236 created
, deleted
, len(requested
))
3238 def _UpdateIndexes(self
):
3239 if self
._index
_yaml
_updater
is not None:
3240 self
._index
_yaml
_updater
.UpdateIndexYaml()
3243 class StubQueryConverter(object):
3244 """Converter for v3 and v4 queries suitable for use in stubs."""
3246 def __init__(self
, entity_converter
):
3247 self
._entity
_converter
= entity_converter
3249 def v4_to_v3_compiled_cursor(self
, v4_cursor
, v3_compiled_cursor
):
3250 """Converts a v4 cursor string to a v3 CompiledCursor.
3253 v4_cursor: a string representing a v4 query cursor
3254 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3256 v3_compiled_cursor
.Clear()
3258 v3_compiled_cursor
.ParseFromString(v4_cursor
)
3259 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3260 raise datastore_pbs
.InvalidConversionError('Invalid query cursor.')
3262 def v3_to_v4_compiled_cursor(self
, v3_compiled_cursor
):
3263 """Converts a v3 CompiledCursor to a v4 cursor string.
3266 v3_compiled_cursor: a datastore_pb.CompiledCursor
3269 a string representing a v4 query cursor
3271 return v3_compiled_cursor
.SerializeToString()
3273 def v4_to_v3_query(self
, v4_partition_id
, v4_query
, v3_query
):
3274 """Converts a v4 Query to a v3 Query.
3277 v4_partition_id: a datastore_v4_pb.PartitionId
3278 v4_query: a datastore_v4_pb.Query
3279 v3_query: a datastore_pb.Query to populate
3282 InvalidConversionError if the query cannot be converted
3286 if v4_partition_id
.dataset_id():
3287 v3_query
.set_app(v4_partition_id
.dataset_id())
3288 if v4_partition_id
.has_namespace():
3289 v3_query
.set_name_space(v4_partition_id
.namespace())
3291 v3_query
.set_persist_offset(True)
3292 v3_query
.set_require_perfect_plan(True)
3293 v3_query
.set_compile(True)
3296 if v4_query
.has_limit():
3297 v3_query
.set_limit(v4_query
.limit())
3298 if v4_query
.offset():
3299 v3_query
.set_offset(v4_query
.offset())
3300 if v4_query
.has_start_cursor():
3301 self
.v4_to_v3_compiled_cursor(v4_query
.start_cursor(),
3302 v3_query
.mutable_compiled_cursor())
3303 if v4_query
.has_end_cursor():
3304 self
.v4_to_v3_compiled_cursor(v4_query
.end_cursor(),
3305 v3_query
.mutable_end_compiled_cursor())
3308 if v4_query
.kind_list():
3309 datastore_pbs
.check_conversion(len(v4_query
.kind_list()) == 1,
3310 'multiple kinds not supported')
3311 v3_query
.set_kind(v4_query
.kind(0).name())
3314 has_key_projection
= False
3315 for prop
in v4_query
.projection_list():
3316 if prop
.property().name() == datastore_pbs
.PROPERTY_NAME_KEY
:
3317 has_key_projection
= True
3319 v3_query
.add_property_name(prop
.property().name())
3320 if has_key_projection
and not v3_query
.property_name_list():
3321 v3_query
.set_keys_only(True)
3324 for prop
in v4_query
.group_by_list():
3325 v3_query
.add_group_by_property_name(prop
.name())
3328 self
.__populate
_v
3_filters
(v4_query
.filter(), v3_query
)
3331 for v4_order
in v4_query
.order_list():
3332 v3_order
= v3_query
.add_order()
3333 v3_order
.set_property(v4_order
.property().name())
3334 if v4_order
.has_direction():
3335 v3_order
.set_direction(v4_order
.direction())
3337 def v3_to_v4_query(self
, v3_query
, v4_query
):
3338 """Converts a v3 Query to a v4 Query.
3341 v3_query: a datastore_pb.Query
3342 v4_query: a datastore_v4_pb.Query to populate
3345 InvalidConversionError if the query cannot be converted
3349 datastore_pbs
.check_conversion(not v3_query
.has_distinct(),
3350 'distinct option not supported')
3351 datastore_pbs
.check_conversion(v3_query
.require_perfect_plan(),
3352 'non-perfect plans not supported')
3356 if v3_query
.has_limit():
3357 v4_query
.set_limit(v3_query
.limit())
3358 if v3_query
.offset():
3359 v4_query
.set_offset(v3_query
.offset())
3360 if v3_query
.has_compiled_cursor():
3361 v4_query
.set_start_cursor(
3362 self
.v3_to_v4_compiled_cursor(v3_query
.compiled_cursor()))
3363 if v3_query
.has_end_compiled_cursor():
3364 v4_query
.set_end_cursor(
3365 self
.v3_to_v4_compiled_cursor(v3_query
.end_compiled_cursor()))
3368 if v3_query
.has_kind():
3369 v4_query
.add_kind().set_name(v3_query
.kind())
3372 for name
in v3_query
.property_name_list():
3373 v4_query
.add_projection().mutable_property().set_name(name
)
3374 if v3_query
.keys_only():
3375 v4_query
.add_projection().mutable_property().set_name(
3376 datastore_pbs
.PROPERTY_NAME_KEY
)
3379 for name
in v3_query
.group_by_property_name_list():
3380 v4_query
.add_group_by().set_name(name
)
3383 num_v4_filters
= len(v3_query
.filter_list())
3384 if v3_query
.has_ancestor():
3387 if num_v4_filters
== 1:
3388 get_property_filter
= self
.__get
_property
_filter
3389 elif num_v4_filters
>= 1:
3390 v4_query
.mutable_filter().mutable_composite_filter().set_operator(
3391 datastore_v4_pb
.CompositeFilter
.AND
)
3392 get_property_filter
= self
.__add
_property
_filter
3394 if v3_query
.has_ancestor():
3395 self
.__v
3_query
_to
_v
4_ancestor
_filter
(v3_query
,
3396 get_property_filter(v4_query
))
3397 for v3_filter
in v3_query
.filter_list():
3398 self
.__v
3_filter
_to
_v
4_property
_filter
(v3_filter
,
3399 get_property_filter(v4_query
))
3402 for v3_order
in v3_query
.order_list():
3403 v4_order
= v4_query
.add_order()
3404 v4_order
.mutable_property().set_name(v3_order
.property())
3405 if v3_order
.has_direction():
3406 v4_order
.set_direction(v3_order
.direction())
3408 def __get_property_filter(self
, v4_query
):
3409 """Returns the PropertyFilter from the query's top-level filter."""
3410 return v4_query
.mutable_filter().mutable_property_filter()
3412 def __add_property_filter(self
, v4_query
):
3413 """Adds and returns a PropertyFilter from the query's composite filter."""
3414 v4_comp_filter
= v4_query
.mutable_filter().mutable_composite_filter()
3415 return v4_comp_filter
.add_filter().mutable_property_filter()
3417 def __populate_v3_filters(self
, v4_filter
, v3_query
):
3418 """Populates a filters for a v3 Query.
3421 v4_filter: a datastore_v4_pb.Filter
3422 v3_query: a datastore_pb.Query to populate with filters
3425 datastore_pbs
.check_conversion(not v4_filter
.has_bounding_circle_filter(),
3426 'bounding circle filter not supported')
3427 datastore_pbs
.check_conversion(not v4_filter
.has_bounding_box_filter(),
3428 'bounding box filter not supported')
3430 if v4_filter
.has_property_filter():
3431 v4_property_filter
= v4_filter
.property_filter()
3432 if (v4_property_filter
.operator()
3433 == datastore_v4_pb
.PropertyFilter
.HAS_ANCESTOR
):
3434 datastore_pbs
.check_conversion(
3435 v4_property_filter
.value().has_key_value(),
3436 'HAS_ANCESTOR requires a reference value')
3437 datastore_pbs
.check_conversion((v4_property_filter
.property().name()
3438 == datastore_pbs
.PROPERTY_NAME_KEY
),
3439 'unsupported property')
3440 datastore_pbs
.check_conversion(not v3_query
.has_ancestor(),
3441 'duplicate ancestor constraint')
3442 self
._entity
_converter
.v4_to_v3_reference(
3443 v4_property_filter
.value().key_value(),
3444 v3_query
.mutable_ancestor())
3446 v3_filter
= v3_query
.add_filter()
3447 property_name
= v4_property_filter
.property().name()
3448 v3_filter
.set_op(v4_property_filter
.operator())
3449 datastore_pbs
.check_conversion(
3450 not v4_property_filter
.value().list_value_list(),
3451 ('unsupported value type, %s, in property filter'
3452 ' on "%s"' % ('list_value', property_name
)))
3453 prop
= v3_filter
.add_property()
3454 prop
.set_multiple(False)
3455 prop
.set_name(property_name
)
3456 self
._entity
_converter
.v4_value_to_v3_property_value(
3457 v4_property_filter
.value(), prop
.mutable_value())
3458 elif v4_filter
.has_composite_filter():
3459 datastore_pbs
.check_conversion((v4_filter
.composite_filter().operator()
3460 == datastore_v4_pb
.CompositeFilter
.AND
),
3461 'unsupported composite property operator')
3462 for v4_sub_filter
in v4_filter
.composite_filter().filter_list():
3463 self
.__populate
_v
3_filters
(v4_sub_filter
, v3_query
)
3465 def __v3_filter_to_v4_property_filter(self
, v3_filter
, v4_property_filter
):
3466 """Converts a v3 Filter to a v4 PropertyFilter.
3469 v3_filter: a datastore_pb.Filter
3470 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3473 InvalidConversionError if the filter cannot be converted
3475 datastore_pbs
.check_conversion(v3_filter
.property_size() == 1,
3477 datastore_pbs
.check_conversion(v3_filter
.op() <= 5,
3478 'unsupported filter op: %d' % v3_filter
.op())
3479 v4_property_filter
.Clear()
3480 v4_property_filter
.set_operator(v3_filter
.op())
3481 v4_property_filter
.mutable_property().set_name(v3_filter
.property(0).name())
3482 self
._entity
_converter
.v3_property_to_v4_value(
3483 v3_filter
.property(0), True, v4_property_filter
.mutable_value())
3485 def __v3_query_to_v4_ancestor_filter(self
, v3_query
, v4_property_filter
):
3486 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3489 v3_query: a datastore_pb.Query
3490 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3492 v4_property_filter
.Clear()
3493 v4_property_filter
.set_operator(
3494 datastore_v4_pb
.PropertyFilter
.HAS_ANCESTOR
)
3495 prop
= v4_property_filter
.mutable_property()
3496 prop
.set_name(datastore_pbs
.PROPERTY_NAME_KEY
)
3497 self
._entity
_converter
.v3_to_v4_key(
3498 v3_query
.ancestor(),
3499 v4_property_filter
.mutable_value().mutable_key_value())
3503 __query_converter
= StubQueryConverter(datastore_pbs
.get_entity_converter())
3506 def get_query_converter():
3507 """Returns a converter for v3 and v4 queries (not suitable for production).
3509 This converter is suitable for use in stubs but not for production.
3512 a StubQueryConverter
3514 return __query_converter
3517 class StubServiceConverter(object):
3518 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3520 def __init__(self
, entity_converter
, query_converter
):
3521 self
._entity
_converter
= entity_converter
3522 self
._query
_converter
= query_converter
3524 def v4_to_v3_cursor(self
, v4_query_handle
, v3_cursor
):
3525 """Converts a v4 cursor string to a v3 Cursor.
3528 v4_query_handle: a string representing a v4 query handle
3529 v3_cursor: a datastore_pb.Cursor to populate
3532 v3_cursor
.ParseFromString(v4_query_handle
)
3533 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3534 raise datastore_pbs
.InvalidConversionError('Invalid query handle.')
3537 def _v3_to_v4_query_handle(self
, v3_cursor
):
3538 """Converts a v3 Cursor to a v4 query handle string.
3541 v3_cursor: a datastore_pb.Cursor
3544 a string representing a v4 cursor
3546 return v3_cursor
.SerializeToString()
3548 def v4_to_v3_txn(self
, v4_txn
, v3_txn
):
3549 """Converts a v4 transaction string to a v3 Transaction.
3552 v4_txn: a string representing a v4 transaction
3553 v3_txn: a datastore_pb.Transaction to populate
3556 v3_txn
.ParseFromString(v4_txn
)
3557 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3558 raise datastore_pbs
.InvalidConversionError('Invalid transaction.')
3561 def _v3_to_v4_txn(self
, v3_txn
):
3562 """Converts a v3 Transaction to a v4 transaction string.
3565 v3_txn: a datastore_pb.Transaction
3568 a string representing a v4 transaction
3570 return v3_txn
.SerializeToString()
3575 def v4_to_v3_begin_transaction_req(self
, app_id
, v4_req
):
3576 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3580 v4_req: a datastore_v4_pb.BeginTransactionRequest
3583 a datastore_pb.BeginTransactionRequest
3585 v3_req
= datastore_pb
.BeginTransactionRequest()
3586 v3_req
.set_app(app_id
)
3587 v3_req
.set_allow_multiple_eg(v4_req
.cross_group())
3590 def v3_to_v4_begin_transaction_resp(self
, v3_resp
):
3591 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3594 v3_resp: a datastore_pb.Transaction
3597 a datastore_v4_pb.BeginTransactionResponse
3599 v4_resp
= datastore_v4_pb
.BeginTransactionResponse()
3600 v4_resp
.set_transaction(self
._v
3_to
_v
4_txn
(v3_resp
))
3606 def v4_rollback_req_to_v3_txn(self
, v4_req
):
3607 """Converts a v4 RollbackRequest to a v3 Transaction.
3610 v4_req: a datastore_v4_pb.RollbackRequest
3613 a datastore_pb.Transaction
3615 v3_txn
= datastore_pb
.Transaction()
3616 self
.v4_to_v3_txn(v4_req
.transaction(), v3_txn
)
3622 def v4_commit_req_to_v3_txn(self
, v4_req
):
3623 """Converts a v4 CommitRequest to a v3 Transaction.
3626 v4_req: a datastore_v4_pb.CommitRequest
3629 a datastore_pb.Transaction
3631 v3_txn
= datastore_pb
.Transaction()
3632 self
.v4_to_v3_txn(v4_req
.transaction(), v3_txn
)
3638 def v4_run_query_req_to_v3_query(self
, v4_req
):
3639 """Converts a v4 RunQueryRequest to a v3 Query.
3641 GQL is not supported.
3644 v4_req: a datastore_v4_pb.RunQueryRequest
3647 a datastore_pb.Query
3650 datastore_pbs
.check_conversion(not v4_req
.has_gql_query(),
3651 'GQL not supported')
3652 v3_query
= datastore_pb
.Query()
3653 self
._query
_converter
.v4_to_v3_query(v4_req
.partition_id(), v4_req
.query(),
3657 if v4_req
.has_suggested_batch_size():
3658 v3_query
.set_count(v4_req
.suggested_batch_size())
3661 read_options
= v4_req
.read_options()
3662 if read_options
.has_transaction():
3663 self
.v4_to_v3_txn(read_options
.transaction(),
3664 v3_query
.mutable_transaction())
3665 elif (read_options
.read_consistency()
3666 == datastore_v4_pb
.ReadOptions
.EVENTUAL
):
3667 v3_query
.set_strong(False)
3668 v3_query
.set_failover_ms(-1)
3669 elif read_options
.read_consistency() == datastore_v4_pb
.ReadOptions
.STRONG
:
3670 v3_query
.set_strong(True)
3672 if v4_req
.has_min_safe_time_seconds():
3673 v3_query
.set_min_safe_time_seconds(v4_req
.min_safe_time_seconds())
3677 def v3_to_v4_run_query_req(self
, v3_req
):
3678 """Converts a v3 Query to a v4 RunQueryRequest.
3681 v3_req: a datastore_pb.Query
3684 a datastore_v4_pb.RunQueryRequest
3686 v4_req
= datastore_v4_pb
.RunQueryRequest()
3689 v4_partition_id
= v4_req
.mutable_partition_id()
3690 v4_partition_id
.set_dataset_id(v3_req
.app())
3691 if v3_req
.name_space():
3692 v4_partition_id
.set_namespace(v3_req
.name_space())
3695 if v3_req
.has_count():
3696 v4_req
.set_suggested_batch_size(v3_req
.count())
3698 datastore_pbs
.check_conversion(
3699 not (v3_req
.has_transaction() and v3_req
.has_failover_ms()),
3700 'Cannot set failover and transaction handle.')
3703 if v3_req
.has_transaction():
3704 v4_req
.mutable_read_options().set_transaction(
3705 self
._v
3_to
_v
4_txn
(v3_req
.transaction()))
3706 elif v3_req
.strong():
3707 v4_req
.mutable_read_options().set_read_consistency(
3708 datastore_v4_pb
.ReadOptions
.STRONG
)
3709 elif v3_req
.has_failover_ms():
3710 v4_req
.mutable_read_options().set_read_consistency(
3711 datastore_v4_pb
.ReadOptions
.EVENTUAL
)
3712 if v3_req
.has_min_safe_time_seconds():
3713 v4_req
.set_min_safe_time_seconds(v3_req
.min_safe_time_seconds())
3715 self
._query
_converter
.v3_to_v4_query(v3_req
, v4_req
.mutable_query())
3719 def v4_run_query_resp_to_v3_query_result(self
, v4_resp
):
3720 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3723 v4_resp: a datastore_v4_pb.QueryResult
3726 a datastore_pb.QueryResult
3728 v3_resp
= self
.v4_to_v3_query_result(v4_resp
.batch())
3731 if v4_resp
.has_query_handle():
3732 self
.v4_to_v3_cursor(v4_resp
.query_handle(), v3_resp
.mutable_cursor())
3736 def v3_to_v4_run_query_resp(self
, v3_resp
):
3737 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3740 v3_resp: a datastore_pb.QueryResult
3743 a datastore_v4_pb.RunQueryResponse
3745 v4_resp
= datastore_v4_pb
.RunQueryResponse()
3746 self
.v3_to_v4_query_result_batch(v3_resp
, v4_resp
.mutable_batch())
3748 if v3_resp
.has_cursor():
3749 v4_resp
.set_query_handle(
3750 self
._query
_converter
.v3_to_v4_compiled_cursor(v3_resp
.cursor()))
3757 def v4_to_v3_next_req(self
, v4_req
):
3758 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3761 v4_req: a datastore_v4_pb.ContinueQueryRequest
3764 a datastore_pb.NextRequest
3766 v3_req
= datastore_pb
.NextRequest()
3767 v3_req
.set_compile(True)
3768 self
.v4_to_v3_cursor(v4_req
.query_handle(), v3_req
.mutable_cursor())
3771 def v3_to_v4_continue_query_resp(self
, v3_resp
):
3772 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3775 v3_resp: a datstore_pb.QueryResult
3778 a datastore_v4_pb.ContinueQueryResponse
3780 v4_resp
= datastore_v4_pb
.ContinueQueryResponse()
3781 self
.v3_to_v4_query_result_batch(v3_resp
, v4_resp
.mutable_batch())
3787 def v4_to_v3_get_req(self
, v4_req
):
3788 """Converts a v4 LookupRequest to a v3 GetRequest.
3791 v4_req: a datastore_v4_pb.LookupRequest
3794 a datastore_pb.GetRequest
3796 v3_req
= datastore_pb
.GetRequest()
3797 v3_req
.set_allow_deferred(True)
3800 if v4_req
.read_options().has_transaction():
3801 self
.v4_to_v3_txn(v4_req
.read_options().transaction(),
3802 v3_req
.mutable_transaction())
3803 elif (v4_req
.read_options().read_consistency()
3804 == datastore_v4_pb
.ReadOptions
.EVENTUAL
):
3805 v3_req
.set_strong(False)
3806 v3_req
.set_failover_ms(-1)
3807 elif (v4_req
.read_options().read_consistency()
3808 == datastore_v4_pb
.ReadOptions
.STRONG
):
3809 v3_req
.set_strong(True)
3811 for v4_key
in v4_req
.key_list():
3812 self
._entity
_converter
.v4_to_v3_reference(v4_key
, v3_req
.add_key())
3816 def v3_to_v4_lookup_resp(self
, v3_resp
):
3817 """Converts a v3 GetResponse to a v4 LookupResponse.
3820 v3_resp: a datastore_pb.GetResponse
3823 a datastore_v4_pb.LookupResponse
3825 v4_resp
= datastore_v4_pb
.LookupResponse()
3827 for v3_ref
in v3_resp
.deferred_list():
3828 self
._entity
_converter
.v3_to_v4_key(v3_ref
, v4_resp
.add_deferred())
3829 for v3_entity
in v3_resp
.entity_list():
3830 if v3_entity
.has_entity():
3831 self
._entity
_converter
.v3_to_v4_entity(
3833 v4_resp
.add_found().mutable_entity())
3834 if v3_entity
.has_key():
3835 self
._entity
_converter
.v3_to_v4_key(
3837 v4_resp
.add_missing().mutable_entity().mutable_key())
3841 def v4_to_v3_query_result(self
, v4_batch
):
3842 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3845 v4_batch: a datastore_v4_pb.QueryResultBatch
3848 a datastore_pb.QueryResult
3850 v3_result
= datastore_pb
.QueryResult()
3853 v3_result
.set_more_results(
3854 (v4_batch
.more_results()
3855 == datastore_v4_pb
.QueryResultBatch
.NOT_FINISHED
))
3856 if v4_batch
.has_end_cursor():
3857 self
._query
_converter
.v4_to_v3_compiled_cursor(
3858 v4_batch
.end_cursor(), v3_result
.mutable_compiled_cursor())
3861 if v4_batch
.entity_result_type() == datastore_v4_pb
.EntityResult
.PROJECTION
:
3862 v3_result
.set_index_only(True)
3863 elif v4_batch
.entity_result_type() == datastore_v4_pb
.EntityResult
.KEY_ONLY
:
3864 v3_result
.set_keys_only(True)
3867 if v4_batch
.has_skipped_results():
3868 v3_result
.set_skipped_results(v4_batch
.skipped_results())
3869 for v4_entity
in v4_batch
.entity_result_list():
3870 v3_entity
= v3_result
.add_result()
3871 self
._entity
_converter
.v4_to_v3_entity(v4_entity
.entity(), v3_entity
)
3872 if v4_batch
.entity_result_type() != datastore_v4_pb
.EntityResult
.FULL
:
3875 v3_entity
.clear_entity_group()
3879 def v3_to_v4_query_result_batch(self
, v3_result
, v4_batch
):
3880 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3883 v3_result: a datastore_pb.QueryResult
3884 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3889 if v3_result
.more_results():
3890 v4_batch
.set_more_results(datastore_v4_pb
.QueryResultBatch
.NOT_FINISHED
)
3892 v4_batch
.set_more_results(
3893 datastore_v4_pb
.QueryResultBatch
.MORE_RESULTS_AFTER_LIMIT
)
3894 if v3_result
.has_compiled_cursor():
3895 v4_batch
.set_end_cursor(
3896 self
._query
_converter
.v3_to_v4_compiled_cursor(
3897 v3_result
.compiled_cursor()))
3900 if v3_result
.keys_only():
3901 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.KEY_ONLY
)
3902 elif v3_result
.index_only():
3903 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.PROJECTION
)
3905 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.FULL
)
3908 if v3_result
.has_skipped_results():
3909 v4_batch
.set_skipped_results(v3_result
.skipped_results())
3910 for v3_entity
in v3_result
.result_list():
3911 v4_entity_result
= datastore_v4_pb
.EntityResult()
3912 self
._entity
_converter
.v3_to_v4_entity(v3_entity
,
3913 v4_entity_result
.mutable_entity())
3914 v4_batch
.entity_result_list().append(v4_entity_result
)
3918 __service_converter
= StubServiceConverter(
3919 datastore_pbs
.get_entity_converter(), __query_converter
)
3922 def get_service_converter():
3923 """Returns a converter for v3 and v4 service request/response protos.
3925 This converter is suitable for use in stubs but not for production.
3928 a StubServiceConverter
3930 return __service_converter
3933 def ReverseBitsInt64(v
):
3934 """Reverse the bits of a 64-bit integer.
3937 v: Input integer of type 'int' or 'long'.
3940 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
3943 v
= ((v
>> 1) & 0x5555555555555555) |
((v
& 0x5555555555555555) << 1)
3944 v
= ((v
>> 2) & 0x3333333333333333) |
((v
& 0x3333333333333333) << 2)
3945 v
= ((v
>> 4) & 0x0F0F0F0F0F0F0F0F) |
((v
& 0x0F0F0F0F0F0F0F0F) << 4)
3946 v
= ((v
>> 8) & 0x00FF00FF00FF00FF) |
((v
& 0x00FF00FF00FF00FF) << 8)
3947 v
= ((v
>> 16) & 0x0000FFFF0000FFFF) |
((v
& 0x0000FFFF0000FFFF) << 16)
3948 v
= int((v
>> 32) |
(v
<< 32) & 0xFFFFFFFFFFFFFFFF)
3952 def ToScatteredId(v
):
3953 """Map counter value v to the scattered ID space.
3955 Translate to scattered ID space, then reverse bits.
3958 v: Counter value from which to produce ID.
3964 datastore_errors.BadArgumentError if counter value exceeds the range of
3965 the scattered ID space.
3967 if v
>= _MAX_SCATTERED_COUNTER
:
3968 raise datastore_errors
.BadArgumentError('counter value too large (%d)' %v
)
3969 return _MAX_SEQUENTIAL_ID
+ 1 + long(ReverseBitsInt64(v
<< _SCATTER_SHIFT
))
3973 """Map ID k to the counter value from which it was generated.
3975 Determine whether k is sequential or scattered ID.
3978 k: ID from which to infer counter value.
3981 Tuple of integers (counter_value, id_space).
3983 if k
> _MAX_SCATTERED_ID
:
3985 elif k
> _MAX_SEQUENTIAL_ID
and k
<= _MAX_SCATTERED_ID
:
3986 return long(ReverseBitsInt64(k
) >> _SCATTER_SHIFT
), SCATTERED
3988 return long(k
), SEQUENTIAL
3990 raise datastore_errors
.BadArgumentError('invalid id (%d)' % k
)
3993 def CompareEntityPbByKey(a
, b
):
3994 """Compare two entity protobuf's by key.
3997 a: entity_pb.EntityProto to compare
3998 b: entity_pb.EntityProto to compare
4000 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4002 return cmp(datastore_types
.Key
._FromPb
(a
.key()),
4003 datastore_types
.Key
._FromPb
(b
.key()))
4006 def _GuessOrders(filters
, orders
):
4007 """Guess any implicit ordering.
4009 The datastore gives a logical, but not necessarily predictable, ordering when
4010 orders are not completely explicit. This function guesses at that ordering
4011 (which is better then always ordering by __key__ for tests).
4014 filters: The datastore_pb.Query_Filter that have already been normalized and
4016 orders: The datastore_pb.Query_Order that have already been normalized and
4017 checked. Mutated in place.
4023 for filter_pb
in filters
:
4024 if filter_pb
.op() in datastore_index
.INEQUALITY_OPERATORS
:
4026 order
= datastore_pb
.Query_Order()
4027 order
.set_property(filter_pb
.property(0).name())
4028 orders
.append(order
)
4032 exists_props
= (filter_pb
.property(0).name() for filter_pb
in filters
4033 if filter_pb
.op() == datastore_pb
.Query_Filter
.EXISTS
)
4034 for prop
in sorted(exists_props
):
4035 order
= datastore_pb
.Query_Order()
4036 order
.set_property(prop
)
4037 orders
.append(order
)
4040 if not orders
or orders
[-1].property() != '__key__':
4041 order
= datastore_pb
.Query_Order()
4042 order
.set_property('__key__')
4043 orders
.append(order
)
4047 def _MakeQuery(query_pb
, filters
, orders
, filter_predicate
):
4048 """Make a datastore_query.Query for the given datastore_pb.Query.
4050 Overrides filters and orders in query with the specified arguments.
4053 query_pb: a datastore_pb.Query.
4054 filters: the filters from query.
4055 orders: the orders from query.
4056 filter_predicate: an additional filter of type
4057 datastore_query.FilterPredicate. This is passed along to implement V4
4058 specific filters without changing the entire stub.
4061 A datastore_query.Query for the datastore_pb.Query."""
4067 clone_pb
= datastore_pb
.Query()
4068 clone_pb
.CopyFrom(query_pb
)
4069 clone_pb
.clear_filter()
4070 clone_pb
.clear_order()
4071 clone_pb
.filter_list().extend(filters
)
4072 clone_pb
.order_list().extend(orders
)
4074 query
= datastore_query
.Query
._from
_pb
(clone_pb
)
4076 assert datastore_v4_pb
.CompositeFilter
._Operator
_NAMES
.values() == ['AND']
4081 if filter_predicate
is not None:
4082 if query
.filter_predicate
is not None:
4085 filter_predicate
= datastore_query
.CompositeFilter(
4086 datastore_query
.CompositeFilter
.AND
,
4087 [filter_predicate
, query
.filter_predicate
])
4089 return datastore_query
.Query(app
=query
.app
,
4090 namespace
=query
.namespace
,
4091 ancestor
=query
.ancestor
,
4092 filter_predicate
=filter_predicate
,
4093 group_by
=query
.group_by
,
4098 def _CreateIndexEntities(entity
, postfix_props
):
4099 """Creates entities for index values that would appear in prodcution.
4101 This function finds all multi-valued properties listed in split_props, and
4102 creates a new entity for each unique combination of values. The resulting
4103 entities will only have a single value for each property listed in
4106 It reserves the right to include index data that would not be
4107 seen in production, e.g. by returning the original entity when no splitting
4108 is needed. LoadEntity will remove any excess fields.
4110 This simulates the results seen by an index scan in the datastore.
4113 entity: The entity_pb.EntityProto to split.
4114 split_props: A set of property names to split on.
4117 A list of the split entity_pb.EntityProtos.
4120 split_required
= False
4122 for prop
in entity
.property_list():
4123 if prop
.name() in postfix_props
:
4124 values
= to_split
.get(prop
.name())
4127 to_split
[prop
.name()] = values
4130 split_required
= True
4131 if prop
.value() not in values
:
4132 values
.append(prop
.value())
4134 base_props
.append(prop
)
4136 if not split_required
:
4140 clone
= entity_pb
.EntityProto()
4141 clone
.CopyFrom(entity
)
4142 clone
.clear_property()
4143 clone
.property_list().extend(base_props
)
4146 for name
, splits
in to_split
.iteritems():
4147 if len(splits
) == 1:
4149 for result
in results
:
4150 prop
= result
.add_property()
4152 prop
.set_multiple(False)
4153 prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
4154 prop
.mutable_value().CopyFrom(splits
[0])
4158 for result
in results
:
4159 for split
in splits
:
4160 clone
= entity_pb
.EntityProto()
4161 clone
.CopyFrom(result
)
4162 prop
= clone
.add_property()
4164 prop
.set_multiple(False)
4165 prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
4166 prop
.mutable_value().CopyFrom(split
)
4167 new_results
.append(clone
)
4168 results
= new_results
4172 def _CreateIndexOnlyQueryResults(results
, postfix_props
):
4173 """Creates a result set similar to that returned by an index only query."""
4175 for result
in results
:
4176 new_results
.extend(_CreateIndexEntities(result
, postfix_props
))
4180 def _ExecuteQuery(results
, query
, filters
, orders
, index_list
,
4181 filter_predicate
=None):
4182 """Executes the query on a superset of its results.
4185 results: superset of results for query.
4186 query: a datastore_pb.Query.
4187 filters: the filters from query.
4188 orders: the orders from query.
4189 index_list: the list of indexes used by the query.
4190 filter_predicate: an additional filter of type
4191 datastore_query.FilterPredicate. This is passed along to implement V4
4192 specific filters without changing the entire stub.
4195 A ListCursor over the results of applying query to results.
4197 orders
= _GuessOrders(filters
, orders
)
4198 dsquery
= _MakeQuery(query
, filters
, orders
, filter_predicate
)
4200 if query
.property_name_size():
4201 results
= _CreateIndexOnlyQueryResults(
4202 results
, set(order
.property() for order
in orders
))
4204 return ListCursor(query
, dsquery
, orders
, index_list
,
4205 datastore_query
.apply_query(dsquery
, results
))
4208 def _UpdateCost(cost
, entity_writes
, index_writes
):
4209 """Updates the provided cost.
4212 cost: Out param. The cost object to update.
4213 entity_writes: The number of entity writes to add.
4214 index_writes: The number of index writes to add.
4216 cost
.set_entity_writes(cost
.entity_writes() + entity_writes
)
4217 cost
.set_index_writes(cost
.index_writes() + index_writes
)
4220 def _CalculateWriteOps(composite_indexes
, old_entity
, new_entity
):
4221 """Determines number of entity and index writes needed to write new_entity.
4223 We assume that old_entity represents the current state of the Datastore.
4226 composite_indexes: The composite_indexes for the kind of the entities.
4227 old_entity: Entity representing the current state in the Datstore.
4228 new_entity: Entity representing the desired state in the Datstore.
4231 A tuple of size 2, where the first value is the number of entity writes and
4232 the second value is the number of index writes.
4234 if (old_entity
is not None and
4235 old_entity
.property_list() == new_entity
.property_list()
4236 and old_entity
.raw_property_list() == new_entity
.raw_property_list()):
4239 index_writes
= _ChangedIndexRows(composite_indexes
, old_entity
, new_entity
)
4240 if old_entity
is None:
4246 return 1, index_writes
4249 def _ChangedIndexRows(composite_indexes
, old_entity
, new_entity
):
4250 """Determine the number of index rows that need to change.
4252 We assume that old_entity represents the current state of the Datastore.
4255 composite_indexes: The composite_indexes for the kind of the entities.
4256 old_entity: Entity representing the current state in the Datastore.
4257 new_entity: Entity representing the desired state in the Datastore
4260 The number of index rows that need to change.
4265 unique_old_properties
= collections
.defaultdict(set)
4270 unique_new_properties
= collections
.defaultdict(set)
4272 if old_entity
is not None:
4273 for old_prop
in old_entity
.property_list():
4274 _PopulateUniquePropertiesSet(old_prop
, unique_old_properties
)
4277 unchanged
= collections
.defaultdict(int)
4279 for new_prop
in new_entity
.property_list():
4280 new_prop_as_str
= _PopulateUniquePropertiesSet(
4281 new_prop
, unique_new_properties
)
4282 if new_prop_as_str
in unique_old_properties
[new_prop
.name()]:
4283 unchanged
[new_prop
.name()] += 1
4288 all_property_names
= set(unique_old_properties
.iterkeys())
4289 all_property_names
.update(unique_old_properties
.iterkeys())
4290 all_property_names
.update(unchanged
.iterkeys())
4292 all_indexes
= _GetEntityByPropertyIndexes(all_property_names
)
4293 all_indexes
.extend([comp
.definition() for comp
in composite_indexes
])
4294 path_size
= new_entity
.key().path().element_size()
4296 for index
in all_indexes
:
4300 ancestor_multiplier
= 1
4301 if index
.ancestor() and index
.property_size() > 1:
4302 ancestor_multiplier
= path_size
4303 writes
+= (_CalculateWritesForCompositeIndex(
4304 index
, unique_old_properties
, unique_new_properties
, unchanged
) *
4305 ancestor_multiplier
)
4309 def _PopulateUniquePropertiesSet(prop
, unique_properties
):
4310 """Populates a set containing unique properties.
4313 prop: An entity property.
4314 unique_properties: Dictionary mapping property names to a set of unique
4318 The property pb in string (hashable) form.
4321 prop
= _CopyAndSetMultipleToFalse(prop
)
4324 prop_as_str
= prop
.SerializePartialToString()
4325 unique_properties
[prop
.name()].add(prop_as_str
)
4329 def _CalculateWritesForCompositeIndex(index
, unique_old_properties
,
4330 unique_new_properties
,
4332 """Calculate the number of writes required to maintain a specific Index.
4335 index: The composite index.
4336 unique_old_properties: Dictionary mapping property names to a set of props
4337 present on the old entity.
4338 unique_new_properties: Dictionary mapping property names to a set of props
4339 present on the new entity.
4340 common_properties: Dictionary mapping property names to the number of
4341 properties with that name that are present on both the old and new
4345 The number of writes required to maintained the provided index.
4350 for prop
in index
.property_list():
4351 old_count
*= len(unique_old_properties
[prop
.name()])
4352 new_count
*= len(unique_new_properties
[prop
.name()])
4353 common_count
*= common_properties
[prop
.name()]
4355 return (old_count
- common_count
) + (new_count
- common_count
)
4358 def _GetEntityByPropertyIndexes(all_property_names
):
4360 for prop_name
in all_property_names
:
4362 _SinglePropertyIndex(prop_name
, entity_pb
.Index_Property
.ASCENDING
))
4364 _SinglePropertyIndex(prop_name
, entity_pb
.Index_Property
.DESCENDING
))
4368 def _SinglePropertyIndex(prop_name
, direction
):
4369 """Creates a single property Index for the given name and direction.
4372 prop_name: The name of the single property on the Index.
4373 direction: The direction of the Index.
4376 A single property Index with the given property and direction.
4378 index
= entity_pb
.Index()
4379 prop
= index
.add_property()
4380 prop
.set_name(prop_name
)
4381 prop
.set_direction(direction
)
4385 def _CopyAndSetMultipleToFalse(prop
):
4386 """Copy the provided Property and set its "multiple" attribute to False.
4389 prop: The Property to copy.
4392 A copy of the given Property with its "multiple" attribute set to False.
4399 prop_copy
= entity_pb
.Property()
4400 prop_copy
.MergeFrom(prop
)
4401 prop_copy
.set_multiple(False)
4405 def _SetStartInclusive(position
, first_direction
):
4406 """Sets the start_inclusive field in position.
4409 position: datastore_pb.Position
4410 first_direction: the first sort order from the query
4411 (a datastore_pb.Query_Order) or None
4413 position
.set_start_inclusive(
4414 position
.before_ascending()
4415 != (first_direction
== datastore_pb
.Query_Order
.DESCENDING
))
4418 def _SetBeforeAscending(position
, first_direction
):
4419 """Sets the before_ascending field in position.
4422 position: datastore_pb.Position
4423 first_direction: the first sort order from the query
4424 (a datastore_pb.Query_Order) or None
4426 position
.set_before_ascending(
4427 position
.start_inclusive()
4428 != (first_direction
== datastore_pb
.Query_Order
.DESCENDING
))