3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
27 from __future__
import with_statement
37 _MD5_FUNC
= hashlib
.md5
53 from google
.net
.proto
import ProtocolBuffer
54 from google
.appengine
.datastore
import entity_pb
56 from google
.appengine
.api
import api_base_pb
57 from google
.appengine
.api
import apiproxy_stub_map
58 from google
.appengine
.api
import datastore_admin
59 from google
.appengine
.api
import datastore_errors
60 from google
.appengine
.api
import datastore_types
61 from google
.appengine
.api
.taskqueue
import taskqueue_service_pb
62 from google
.appengine
.datastore
import datastore_index
63 from google
.appengine
.datastore
import datastore_pb
64 from google
.appengine
.datastore
import datastore_pbs
65 from google
.appengine
.datastore
import datastore_query
66 from google
.appengine
.datastore
import datastore_stub_index
67 from google
.appengine
.datastore
import datastore_v4_pb
68 from google
.appengine
.runtime
import apiproxy_errors
73 _MAXIMUM_RESULTS
= 300
79 _MAXIMUM_QUERY_RESULT_BYTES
= 2000000
85 _MAX_QUERY_OFFSET
= 1000
89 _PROPERTY_TYPE_NAMES
= {
91 entity_pb
.PropertyValue
.kint64Value
: 'INT64',
92 entity_pb
.PropertyValue
.kbooleanValue
: 'BOOLEAN',
93 entity_pb
.PropertyValue
.kstringValue
: 'STRING',
94 entity_pb
.PropertyValue
.kdoubleValue
: 'DOUBLE',
95 entity_pb
.PropertyValue
.kPointValueGroup
: 'POINT',
96 entity_pb
.PropertyValue
.kUserValueGroup
: 'USER',
97 entity_pb
.PropertyValue
.kReferenceValueGroup
: 'REFERENCE'
102 _SCATTER_PROPORTION
= 32768
112 _BLOB_MEANINGS
= frozenset((entity_pb
.Property
.BLOB
,
113 entity_pb
.Property
.ENTITY_PROTO
,
114 entity_pb
.Property
.TEXT
))
126 _INITIAL_RETRY_DELAY_MS
= 100
130 _RETRY_DELAY_MULTIPLIER
= 2
134 _MAX_RETRY_DELAY_MS
= 120000
139 SEQUENTIAL
= 'sequential'
140 SCATTERED
= 'scattered'
146 _MAX_SEQUENTIAL_BIT
= 52
151 _MAX_SEQUENTIAL_COUNTER
= (1 << _MAX_SEQUENTIAL_BIT
) - 1
155 _MAX_SEQUENTIAL_ID
= _MAX_SEQUENTIAL_COUNTER
160 _MAX_SCATTERED_COUNTER
= (1 << (_MAX_SEQUENTIAL_BIT
- 1)) - 1
166 _MAX_SCATTERED_ID
= _MAX_SEQUENTIAL_ID
+ 1 + _MAX_SCATTERED_COUNTER
170 _SCATTER_SHIFT
= 64 - _MAX_SEQUENTIAL_BIT
+ 1
173 def _GetScatterProperty(entity_proto
):
174 """Gets the scatter property for an object.
176 For ease of implementation, this is not synchronized with the actual
177 value on the App Engine server, but should work equally well.
179 Note: This property may change, either here or in production. No client
180 other than the mapper framework should rely on it directly.
183 The PropertyValue of the scatter property or None if this entity should not
184 have a scatter property.
186 hash_obj
= _MD5_FUNC()
187 for element
in entity_proto
.key().path().element_list():
188 if element
.has_name():
189 hash_obj
.update(element
.name())
190 elif element
.has_id():
191 hash_obj
.update(str(element
.id()))
192 hash_bytes
= hash_obj
.digest()[0:2]
193 (hash_int
,) = struct
.unpack('H', hash_bytes
)
195 if hash_int
>= _SCATTER_PROPORTION
:
198 scatter_property
= entity_pb
.Property()
199 scatter_property
.set_name(datastore_types
.SCATTER_SPECIAL_PROPERTY
)
200 scatter_property
.set_meaning(entity_pb
.Property
.BYTESTRING
)
201 scatter_property
.set_multiple(False)
202 property_value
= scatter_property
.mutable_value()
203 property_value
.set_stringvalue(hash_bytes
)
204 return scatter_property
210 _SPECIAL_PROPERTY_MAP
= {
211 datastore_types
.SCATTER_SPECIAL_PROPERTY
: (False, True, _GetScatterProperty
)
215 def GetInvisibleSpecialPropertyNames():
216 """Gets the names of all non user-visible special properties."""
218 for name
, value
in _SPECIAL_PROPERTY_MAP
.items():
219 is_visible
, _
, _
= value
221 invisible_names
.append(name
)
222 return invisible_names
225 def _PrepareSpecialProperties(entity_proto
, is_load
):
226 """Computes special properties for loading or storing.
227 Strips other special properties."""
228 for i
in xrange(entity_proto
.property_size() - 1, -1, -1):
229 if _SPECIAL_PROPERTY_MAP
.has_key(entity_proto
.property(i
).name()):
230 del entity_proto
.property_list()[i
]
232 for is_visible
, is_stored
, property_func
in _SPECIAL_PROPERTY_MAP
.values():
234 should_process
= is_visible
236 should_process
= is_stored
239 special_property
= property_func(entity_proto
)
241 entity_proto
.property_list().append(special_property
)
244 def _GetGroupByKey(entity
, property_names
):
245 """Computes a key value that uniquely identifies the 'group' of an entity.
248 entity: The entity_pb.EntityProto for which to create the group key.
249 property_names: The names of the properties in the group by clause.
252 A hashable value that uniquely identifies the entity's 'group'.
254 return frozenset((prop
.name(), prop
.value().SerializeToString())
255 for prop
in entity
.property_list()
256 if prop
.name() in property_names
)
259 def PrepareSpecialPropertiesForStore(entity_proto
):
260 """Computes special properties for storing.
261 Strips other special properties."""
262 _PrepareSpecialProperties(entity_proto
, False)
265 def LoadEntity(entity
, keys_only
=False, property_names
=None):
266 """Prepares an entity to be returned to the user.
269 entity: a entity_pb.EntityProto or None
270 keys_only: if a keys only result should be produced
271 property_names: if not None or empty, cause a projected entity
272 to be produced with the given properties.
275 A user friendly copy of entity or None.
278 clone
= entity_pb
.EntityProto()
281 clone
.mutable_key().CopyFrom(entity
.key())
282 clone
.mutable_entity_group()
284 for prop
in entity
.property_list():
285 if prop
.name() in property_names
:
287 Check(prop
.name() not in seen
,
288 "datastore dev stub produced bad result",
289 datastore_pb
.Error
.INTERNAL_ERROR
)
290 seen
.add(prop
.name())
291 new_prop
= clone
.add_property()
292 new_prop
.set_name(prop
.name())
293 new_prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
294 new_prop
.mutable_value().CopyFrom(prop
.value())
295 new_prop
.set_multiple(False)
298 clone
.mutable_key().CopyFrom(entity
.key())
299 clone
.mutable_entity_group()
302 clone
.CopyFrom(entity
)
303 PrepareSpecialPropertiesForLoad(clone
)
307 def StoreEntity(entity
):
308 """Prepares an entity for storing.
311 entity: a entity_pb.EntityProto to prepare
314 A copy of entity that should be stored in its place.
316 clone
= entity_pb
.EntityProto()
317 clone
.CopyFrom(entity
)
321 PrepareSpecialPropertiesForStore(clone
)
325 def PrepareSpecialPropertiesForLoad(entity_proto
):
326 """Computes special properties that are user-visible.
327 Strips other special properties."""
328 _PrepareSpecialProperties(entity_proto
, True)
331 def Check(test
, msg
='', error_code
=datastore_pb
.Error
.BAD_REQUEST
):
332 """Raises an apiproxy_errors.ApplicationError if the condition is false.
335 test: A condition to test.
336 msg: A string to return with the error.
337 error_code: One of datastore_pb.Error to use as an error code.
340 apiproxy_errors.ApplicationError: If test is false.
343 raise apiproxy_errors
.ApplicationError(error_code
, msg
)
346 def CheckValidUTF8(string
, desc
):
347 """Check that the given string is valid UTF-8.
350 string: the string to validate.
351 desc: a description of the string being validated.
354 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
357 string
.decode('utf-8')
358 except UnicodeDecodeError:
359 Check(False, '%s is not valid UTF-8.' % desc
)
362 def CheckAppId(request_trusted
, request_app_id
, app_id
):
363 """Check that this is the stub for app_id.
366 request_trusted: If the request is trusted.
367 request_app_id: The application ID of the app making the request.
368 app_id: An application ID.
371 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
375 CheckValidUTF8(app_id
, "app id");
376 Check(request_trusted
or app_id
== request_app_id
,
377 'app "%s" cannot access app "%s"\'s data' % (request_app_id
, app_id
))
380 def CheckReference(request_trusted
,
383 require_id_or_name
=True):
387 request_trusted: If the request is trusted.
388 request_app_id: The application ID of the app making the request.
389 key: entity_pb.Reference
390 require_id_or_name: Boolean indicating if we should enforce the presence of
391 an id or name in the last element of the key's path.
394 apiproxy_errors.ApplicationError: if the key is invalid
397 assert isinstance(key
, entity_pb
.Reference
)
399 CheckAppId(request_trusted
, request_app_id
, key
.app())
401 Check(key
.path().element_size() > 0, 'key\'s path cannot be empty')
403 if require_id_or_name
:
405 last_element
= key
.path().element_list()[-1]
406 has_id_or_name
= ((last_element
.has_id() and last_element
.id() != 0) or
407 (last_element
.has_name() and last_element
.name() != ""))
408 if not has_id_or_name
:
409 raise datastore_errors
.BadRequestError('missing key id/name')
411 for elem
in key
.path().element_list():
412 Check(not elem
.has_id() or not elem
.has_name(),
413 'each key path element should have id or name but not both: %r' % key
)
414 CheckValidUTF8(elem
.type(), 'key path element type')
416 CheckValidUTF8(elem
.name(), 'key path element name')
419 def CheckEntity(request_trusted
, request_app_id
, entity
):
420 """Check if this entity can be stored.
423 request_trusted: If the request is trusted.
424 request_app_id: The application ID of the app making the request.
425 entity: entity_pb.EntityProto
428 apiproxy_errors.ApplicationError: if the entity is invalid
432 CheckReference(request_trusted
, request_app_id
, entity
.key(), False)
433 for prop
in entity
.property_list():
434 CheckProperty(request_trusted
, request_app_id
, prop
)
435 for prop
in entity
.raw_property_list():
436 CheckProperty(request_trusted
, request_app_id
, prop
, indexed
=False)
439 def CheckProperty(request_trusted
, request_app_id
, prop
, indexed
=True):
440 """Check if this property can be stored.
443 request_trusted: If the request is trusted.
444 request_app_id: The application ID of the app making the request.
445 prop: entity_pb.Property
446 indexed: Whether the property is indexed.
449 apiproxy_errors.ApplicationError: if the property is invalid
453 meaning
= prop
.meaning()
454 CheckValidUTF8(name
, 'property name')
455 Check(request_trusted
or
456 not datastore_types
.RESERVED_PROPERTY_NAME
.match(name
),
457 'cannot store entity with reserved property name \'%s\'' % name
)
458 Check(prop
.meaning() != entity_pb
.Property
.INDEX_VALUE
,
459 'Entities with incomplete properties cannot be written.')
460 is_blob
= meaning
in _BLOB_MEANINGS
463 'BLOB, ENITY_PROTO or TEXT property ' + name
+
464 ' must be in a raw_property field')
465 max_length
= datastore_types
._MAX
_STRING
_LENGTH
468 Check(value
.has_stringvalue(),
469 'BLOB / ENTITY_PROTO / TEXT raw property ' + name
+
470 'must have a string value')
471 max_length
= datastore_types
._MAX
_RAW
_PROPERTY
_BYTES
472 if meaning
== entity_pb
.Property
.ATOM_LINK
:
473 max_length
= datastore_types
._MAX
_LINK
_PROPERTY
_LENGTH
475 CheckPropertyValue(name
, value
, max_length
, meaning
)
478 def CheckPropertyValue(name
, value
, max_length
, meaning
):
479 """Check if this property value can be stored.
482 name: name of the property
483 value: entity_pb.PropertyValue
484 max_length: maximum length for string values
485 meaning: meaning of the property
488 apiproxy_errors.ApplicationError: if the property is invalid
490 num_values
= (value
.has_int64value() +
491 value
.has_stringvalue() +
492 value
.has_booleanvalue() +
493 value
.has_doublevalue() +
494 value
.has_pointvalue() +
495 value
.has_uservalue() +
496 value
.has_referencevalue())
497 Check(num_values
<= 1, 'PropertyValue for ' + name
+
498 ' has multiple value fields set')
500 if value
.has_stringvalue():
508 s16
= value
.stringvalue().decode('utf-8', 'replace').encode('utf-16')
510 Check((len(s16
) - 2) / 2 <= max_length
,
511 'Property %s is too long. Maximum length is %d.' % (name
, max_length
))
512 if (meaning
not in _BLOB_MEANINGS
and
513 meaning
!= entity_pb
.Property
.BYTESTRING
):
514 CheckValidUTF8(value
.stringvalue(), 'String property value')
517 def CheckTransaction(request_trusted
, request_app_id
, transaction
):
518 """Check that this transaction is valid.
521 request_trusted: If the request is trusted.
522 request_app_id: The application ID of the app making the request.
523 transaction: datastore_pb.Transaction
526 apiproxy_errors.ApplicationError: if the transaction is not valid.
528 assert isinstance(transaction
, datastore_pb
.Transaction
)
529 CheckAppId(request_trusted
, request_app_id
, transaction
.app())
532 def CheckQuery(query
, filters
, orders
, max_query_components
):
533 """Check a datastore query with normalized filters, orders.
535 Raises an ApplicationError when any of the following conditions are violated:
536 - transactional queries have an ancestor
537 - queries that are not too large
538 (sum of filters, orders, ancestor <= max_query_components)
539 - ancestor (if any) app and namespace match query app and namespace
540 - kindless queries only filter on __key__ and only sort on __key__ ascending
541 - multiple inequality (<, <=, >, >=) filters all applied to the same property
542 - filters on __key__ compare to a reference in the same app and namespace as
544 - if an inequality filter on prop X is used, the first order (if any) must
548 query: query to validate
549 filters: normalized (by datastore_index.Normalize) filters from query
550 orders: normalized (by datastore_index.Normalize) orders from query
551 max_query_components: limit on query complexity
553 Check(query
.property_name_size() == 0 or not query
.keys_only(),
554 'projection and keys_only cannot both be set')
556 projected_properties
= set(query
.property_name_list())
557 for prop_name
in query
.property_name_list():
558 Check(not datastore_types
.RESERVED_PROPERTY_NAME
.match(prop_name
),
559 'projections are not supported for the property: ' + prop_name
)
560 Check(len(projected_properties
) == len(query
.property_name_list()),
561 "cannot project a property multiple times")
563 key_prop_name
= datastore_types
.KEY_SPECIAL_PROPERTY
564 unapplied_log_timestamp_us_name
= (
565 datastore_types
._UNAPPLIED
_LOG
_TIMESTAMP
_SPECIAL
_PROPERTY
)
567 if query
.has_transaction():
569 Check(query
.has_ancestor(),
570 'Only ancestor queries are allowed inside transactions.')
573 num_components
= len(filters
) + len(orders
)
574 if query
.has_ancestor():
576 Check(num_components
<= max_query_components
,
577 'query is too large. may not have more than %s filters'
578 ' + sort orders ancestor total' % max_query_components
)
581 if query
.has_ancestor():
582 ancestor
= query
.ancestor()
583 Check(query
.app() == ancestor
.app(),
584 'query app is %s but ancestor app is %s' %
585 (query
.app(), ancestor
.app()))
586 Check(query
.name_space() == ancestor
.name_space(),
587 'query namespace is %s but ancestor namespace is %s' %
588 (query
.name_space(), ancestor
.name_space()))
591 if query
.group_by_property_name_size():
592 group_by_set
= set(query
.group_by_property_name_list())
596 Check(order
.property() in group_by_set
,
597 'items in the group by clause must be specified first '
599 group_by_set
.remove(order
.property())
603 ineq_prop_name
= None
604 for filter in filters
:
605 Check(filter.property_size() == 1,
606 'Filter has %d properties, expected 1' % filter.property_size())
608 prop
= filter.property(0)
609 prop_name
= prop
.name().decode('utf-8')
611 if prop_name
== key_prop_name
:
615 Check(prop
.value().has_referencevalue(),
616 '%s filter value must be a Key' % key_prop_name
)
617 ref_val
= prop
.value().referencevalue()
618 Check(ref_val
.app() == query
.app(),
619 '%s filter app is %s but query app is %s' %
620 (key_prop_name
, ref_val
.app(), query
.app()))
621 Check(ref_val
.name_space() == query
.name_space(),
622 '%s filter namespace is %s but query namespace is %s' %
623 (key_prop_name
, ref_val
.name_space(), query
.name_space()))
625 if filter.op() in datastore_index
.EQUALITY_OPERATORS
:
626 Check(prop_name
not in projected_properties
,
627 'cannot use projection on a property with an equality filter')
628 if (filter.op() in datastore_index
.INEQUALITY_OPERATORS
and
629 prop_name
!= unapplied_log_timestamp_us_name
):
630 if ineq_prop_name
is None:
631 ineq_prop_name
= prop_name
633 Check(ineq_prop_name
== prop_name
,
634 'Only one inequality filter per query is supported. '
635 'Encountered both %s and %s' % (ineq_prop_name
, prop_name
))
637 if (ineq_prop_name
is not None
638 and query
.group_by_property_name_size() > 0
641 Check(ineq_prop_name
in group_by_set
,
642 'Inequality filter on %s must also be a group by '
643 'property when group by properties are set.'
646 if ineq_prop_name
is not None and orders
:
648 first_order_prop
= orders
[0].property().decode('utf-8')
649 Check(first_order_prop
== ineq_prop_name
,
650 'The first sort property must be the same as the property '
651 'to which the inequality filter is applied. In your query '
652 'the first sort property is %s but the inequality filter '
653 'is on %s' % (first_order_prop
, ineq_prop_name
))
655 if not query
.has_kind():
657 for filter in filters
:
658 prop_name
= filter.property(0).name().decode('utf-8')
659 Check(prop_name
== key_prop_name
or
660 prop_name
== unapplied_log_timestamp_us_name
,
661 'kind is required for non-__key__ filters')
663 prop_name
= order
.property().decode('utf-8')
664 Check(prop_name
== key_prop_name
and
665 order
.direction() is datastore_pb
.Query_Order
.ASCENDING
,
666 'kind is required for all orders except __key__ ascending')
669 class ValueRange(object):
670 """A range of values defined by its two extremes (inclusive or exclusive)."""
675 Creates an unlimited range.
677 self
.__start
= self
.__end
= None
678 self
.__start
_inclusive
= self
.__end
_inclusive
= False
680 def Update(self
, rel_op
, limit
):
681 """Filter the range by 'rel_op limit'.
684 rel_op: relational operator from datastore_pb.Query_Filter.
685 limit: the value to limit the range by.
688 if rel_op
== datastore_pb
.Query_Filter
.LESS_THAN
:
689 if self
.__end
is None or limit
<= self
.__end
:
691 self
.__end
_inclusive
= False
692 elif (rel_op
== datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
or
693 rel_op
== datastore_pb
.Query_Filter
.EQUAL
):
694 if self
.__end
is None or limit
< self
.__end
:
696 self
.__end
_inclusive
= True
698 if rel_op
== datastore_pb
.Query_Filter
.GREATER_THAN
:
699 if self
.__start
is None or limit
>= self
.__start
:
701 self
.__start
_inclusive
= False
702 elif (rel_op
== datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
or
703 rel_op
== datastore_pb
.Query_Filter
.EQUAL
):
704 if self
.__start
is None or limit
> self
.__start
:
706 self
.__start
_inclusive
= True
708 def Contains(self
, value
):
709 """Check if the range contains a specific value.
712 value: the value to check.
714 True iff value is contained in this range.
716 if self
.__start
is not None:
717 if self
.__start
_inclusive
and value
< self
.__start
: return False
718 if not self
.__start
_inclusive
and value
<= self
.__start
: return False
719 if self
.__end
is not None:
720 if self
.__end
_inclusive
and value
> self
.__end
: return False
721 if not self
.__end
_inclusive
and value
>= self
.__end
: return False
724 def Remap(self
, mapper
):
725 """Transforms the range extremes with a function.
727 The function mapper must preserve order, i.e.
728 x rel_op y iff mapper(x) rel_op y
731 mapper: function to apply to the range extremes.
733 self
.__start
= self
.__start
and mapper(self
.__start
)
734 self
.__end
= self
.__end
and mapper(self
.__end
)
736 def MapExtremes(self
, mapper
):
737 """Evaluate a function on the range extremes.
740 mapper: function to apply to the range extremes.
742 (x, y) where x = None if the range has no start,
743 mapper(start, start_inclusive, False) otherwise
744 y = None if the range has no end,
745 mapper(end, end_inclusive, True) otherwise
748 self
.__start
and mapper(self
.__start
, self
.__start
_inclusive
, False),
749 self
.__end
and mapper(self
.__end
, self
.__end
_inclusive
, True))
752 def ParseKeyFilteredQuery(filters
, orders
):
753 """Parse queries which only allow filters and ascending-orders on __key__.
755 Raises exceptions for illegal queries.
757 filters: the normalized filters of a query.
758 orders: the normalized orders of a query.
760 The key range (a ValueRange over datastore_types.Key) requested in the
764 remaining_filters
= []
765 key_range
= ValueRange()
766 key_prop
= datastore_types
.KEY_SPECIAL_PROPERTY
769 if not (f
.property_size() == 1 and
770 f
.property(0).name() == key_prop
and
771 not (op
== datastore_pb
.Query_Filter
.IN
or
772 op
== datastore_pb
.Query_Filter
.EXISTS
)):
773 remaining_filters
.append(f
)
776 val
= f
.property(0).value()
777 Check(val
.has_referencevalue(), '__key__ kind must be compared to a key')
778 limit
= datastore_types
.FromReferenceProperty(val
)
779 key_range
.Update(op
, limit
)
782 remaining_orders
= []
784 if not (o
.direction() == datastore_pb
.Query_Order
.ASCENDING
and
785 o
.property() == datastore_types
.KEY_SPECIAL_PROPERTY
):
786 remaining_orders
.append(o
)
792 Check(not remaining_filters
,
793 'Only comparison filters on ' + key_prop
+ ' supported')
794 Check(not remaining_orders
,
795 'Only ascending order on ' + key_prop
+ ' supported')
800 def ParseKindQuery(query
, filters
, orders
):
801 """Parse __kind__ (schema) queries.
803 Raises exceptions for illegal queries.
806 filters: the normalized filters from query.
807 orders: the normalized orders from query.
809 The kind range (a ValueRange over string) requested in the query.
812 Check(not query
.has_ancestor(), 'ancestor queries on __kind__ not allowed')
814 key_range
= ParseKeyFilteredQuery(filters
, orders
)
815 key_range
.Remap(_KindKeyToString
)
820 def _KindKeyToString(key
):
821 """Extract kind name from __kind__ key.
823 Raises an ApplicationError if the key is not of the form '__kind__'/name.
826 key: a key for a __kind__ instance.
828 kind specified by key.
830 key_path
= key
.to_path()
831 if (len(key_path
) == 2 and key_path
[0] == '__kind__' and
832 isinstance(key_path
[1], basestring
)):
834 Check(False, 'invalid Key for __kind__ table')
837 def ParseNamespaceQuery(query
, filters
, orders
):
838 """Parse __namespace__ queries.
840 Raises exceptions for illegal queries.
843 filters: the normalized filters from query.
844 orders: the normalized orders from query.
846 The kind range (a ValueRange over string) requested in the query.
849 Check(not query
.has_ancestor(),
850 'ancestor queries on __namespace__ not allowed')
852 key_range
= ParseKeyFilteredQuery(filters
, orders
)
853 key_range
.Remap(_NamespaceKeyToString
)
858 def _NamespaceKeyToString(key
):
859 """Extract namespace name from __namespace__ key.
861 Raises an ApplicationError if the key is not of the form '__namespace__'/name
862 or '__namespace__'/_EMPTY_NAMESPACE_ID.
865 key: a key for a __namespace__ instance.
867 namespace specified by key.
869 key_path
= key
.to_path()
870 if len(key_path
) == 2 and key_path
[0] == '__namespace__':
871 if key_path
[1] == datastore_types
._EMPTY
_NAMESPACE
_ID
:
873 if isinstance(key_path
[1], basestring
):
875 Check(False, 'invalid Key for __namespace__ table')
878 def ParsePropertyQuery(query
, filters
, orders
):
879 """Parse __property__ queries.
881 Raises exceptions for illegal queries.
884 filters: the normalized filters from query.
885 orders: the normalized orders from query.
887 The kind range (a ValueRange over (kind, property) pairs) requested
891 Check(not query
.has_transaction(),
892 'transactional queries on __property__ not allowed')
894 key_range
= ParseKeyFilteredQuery(filters
, orders
)
895 key_range
.Remap(lambda x
: _PropertyKeyToString(x
, ''))
897 if query
.has_ancestor():
898 ancestor
= datastore_types
.Key
._FromPb
(query
.ancestor())
899 ancestor_kind
, ancestor_property
= _PropertyKeyToString(ancestor
, None)
902 if ancestor_property
is not None:
903 key_range
.Update(datastore_pb
.Query_Filter
.EQUAL
,
904 (ancestor_kind
, ancestor_property
))
907 key_range
.Update(datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
,
909 key_range
.Update(datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
,
910 (ancestor_kind
+ '\0', ''))
911 query
.clear_ancestor()
916 def _PropertyKeyToString(key
, default_property
):
917 """Extract property name from __property__ key.
919 Raises an ApplicationError if the key is not of the form
920 '__kind__'/kind, '__property__'/property or '__kind__'/kind
923 key: a key for a __property__ instance.
924 default_property: property value to return when key only has a kind.
926 kind, property if key = '__kind__'/kind, '__property__'/property
927 kind, default_property if key = '__kind__'/kind
929 key_path
= key
.to_path()
930 if (len(key_path
) == 2 and
931 key_path
[0] == '__kind__' and isinstance(key_path
[1], basestring
)):
932 return (key_path
[1], default_property
)
933 if (len(key_path
) == 4 and
934 key_path
[0] == '__kind__' and isinstance(key_path
[1], basestring
) and
935 key_path
[2] == '__property__' and isinstance(key_path
[3], basestring
)):
936 return (key_path
[1], key_path
[3])
938 Check(False, 'invalid Key for __property__ table')
941 def SynthesizeUserId(email
):
942 """Return a synthetic user ID from an email address.
944 Note that this is not the same user ID found in the production system.
947 email: An email address.
950 A string userid derived from the email address.
953 user_id_digest
= _MD5_FUNC(email
.lower()).digest()
954 user_id
= '1' + ''.join(['%02d' % ord(x
) for x
in user_id_digest
])[:20]
958 def FillUsersInQuery(filters
):
959 """Fill in a synthetic user ID for all user properties in a set of filters.
962 filters: The normalized filters from query.
964 for filter in filters
:
965 for property in filter.property_list():
969 def FillUser(property):
970 """Fill in a synthetic user ID for a user properties.
973 property: A Property which may have a user value.
975 if property.value().has_uservalue():
976 uid
= SynthesizeUserId(property.value().uservalue().email())
978 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid
)
981 class BaseCursor(object):
982 """A base query cursor over a list of entities.
985 cursor: the integer cursor.
986 app: the app for which this cursor was created.
987 keys_only: whether the query is keys_only.
990 _next_cursor: the next cursor to allocate.
991 _next_cursor_lock: protects _next_cursor.
994 _next_cursor_lock
= threading
.Lock()
996 def __init__(self
, query
, dsquery
, orders
, index_list
):
1000 query: the query request proto.
1001 dsquery: a datastore_query.Query over query.
1002 orders: the orders of query as returned by _GuessOrders.
1003 index_list: the list of indexes used by the query.
1006 self
.keys_only
= query
.keys_only()
1007 self
.property_names
= set(query
.property_name_list())
1008 self
.group_by
= set(query
.group_by_property_name_list())
1009 self
.app
= query
.app()
1010 self
.cursor
= self
._AcquireCursorID
()
1012 self
.__order
_compare
_entities
= dsquery
._order
.cmp_for_filter(
1013 dsquery
._filter
_predicate
)
1015 self
.__cursor
_properties
= self
.group_by
1017 self
.__cursor
_properties
= set(order
.property() for order
in orders
)
1018 self
.__cursor
_properties
.add('__key__')
1019 self
.__cursor
_properties
= frozenset(self
.__cursor
_properties
)
1021 self
.__first
_sort
_order
= orders
[0].direction()
1022 self
.__index
_list
= index_list
1024 def _PopulateResultMetadata(self
, query_result
, compile,
1025 first_result
, last_result
):
1026 query_result
.set_keys_only(self
.keys_only
)
1027 if query_result
.more_results():
1028 cursor
= query_result
.mutable_cursor()
1029 cursor
.set_app(self
.app
)
1030 cursor
.set_cursor(self
.cursor
)
1032 self
._EncodeCompiledCursor
(last_result
,
1033 query_result
.mutable_compiled_cursor())
1035 query_result
.index_list().extend(self
.__index
_list
)
1038 def _AcquireCursorID(cls
):
1039 """Acquires the next cursor id in a thread safe manner."""
1040 cls
._next
_cursor
_lock
.acquire()
1042 cursor_id
= cls
._next
_cursor
1043 cls
._next
_cursor
+= 1
1045 cls
._next
_cursor
_lock
.release()
1048 def _IsBeforeCursor(self
, entity
, cursor
):
1049 """True if entity is before cursor according to the current order.
1052 entity: a entity_pb.EntityProto entity.
1053 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1055 comparison_entity
= entity_pb
.EntityProto()
1056 for prop
in entity
.property_list():
1057 if prop
.name() in self
.__cursor
_properties
:
1058 comparison_entity
.add_property().MergeFrom(prop
)
1059 if cursor
[0].has_key():
1060 comparison_entity
.mutable_key().MergeFrom(entity
.key())
1061 x
= self
.__order
_compare
_entities
(comparison_entity
, cursor
[0])
1067 def _DecodeCompiledCursor(self
, compiled_cursor
):
1068 """Converts a compiled_cursor into a cursor_entity.
1071 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1074 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1075 be included in the result set.
1077 assert compiled_cursor
.has_position()
1079 position
= compiled_cursor
.position()
1084 remaining_properties
= set(self
.__cursor
_properties
)
1086 cursor_entity
= entity_pb
.EntityProto()
1087 if position
.has_key():
1088 cursor_entity
.mutable_key().CopyFrom(position
.key())
1090 remaining_properties
.remove('__key__')
1092 Check(False, 'Cursor does not match query: extra value __key__')
1093 for indexvalue
in position
.indexvalue_list():
1094 property = cursor_entity
.add_property()
1095 property.set_name(indexvalue
.property())
1096 property.mutable_value().CopyFrom(indexvalue
.value())
1098 remaining_properties
.remove(indexvalue
.property())
1100 Check(False, 'Cursor does not match query: extra value %s' %
1101 indexvalue
.property())
1102 Check(not remaining_properties
,
1103 'Cursor does not match query: missing values for %r' %
1104 remaining_properties
)
1108 return (cursor_entity
, position
.start_inclusive())
1110 def _EncodeCompiledCursor(self
, last_result
, compiled_cursor
):
1111 """Converts the current state of the cursor into a compiled_cursor.
1114 last_result: the last result returned by this query.
1115 compiled_cursor: an empty datstore_pb.CompiledCursor.
1117 if last_result
is not None:
1120 position
= compiled_cursor
.mutable_position()
1123 if '__key__' in self
.__cursor
_properties
:
1124 position
.mutable_key().MergeFrom(last_result
.key())
1125 for prop
in last_result
.property_list():
1126 if prop
.name() in self
.__cursor
_properties
:
1127 indexvalue
= position
.add_indexvalue()
1128 indexvalue
.set_property(prop
.name())
1129 indexvalue
.mutable_value().CopyFrom(prop
.value())
1130 position
.set_start_inclusive(False)
1131 _SetBeforeAscending(position
, self
.__first
_sort
_order
)
1134 class ListCursor(BaseCursor
):
1135 """A query cursor over a list of entities.
1138 keys_only: whether the query is keys_only
1141 def __init__(self
, query
, dsquery
, orders
, index_list
, results
):
1145 query: the query request proto
1146 dsquery: a datastore_query.Query over query.
1147 orders: the orders of query as returned by _GuessOrders.
1148 index_list: the list of indexes used by the query.
1149 results: list of entity_pb.EntityProto
1151 super(ListCursor
, self
).__init
__(query
, dsquery
, orders
, index_list
)
1157 for result
in results
:
1158 key_value
= _GetGroupByKey(result
, self
.group_by
)
1159 if key_value
not in distincts
:
1160 distincts
.add(key_value
)
1161 new_results
.append(result
)
1162 results
= new_results
1164 if query
.has_compiled_cursor() and query
.compiled_cursor().has_position():
1165 start_cursor
= self
._DecodeCompiledCursor
(query
.compiled_cursor())
1166 self
.__last
_result
= start_cursor
[0]
1167 start_cursor_position
= self
._GetCursorOffset
(results
, start_cursor
)
1169 self
.__last
_result
= None
1170 start_cursor_position
= 0
1172 if query
.has_end_compiled_cursor():
1173 if query
.end_compiled_cursor().has_position():
1174 end_cursor
= self
._DecodeCompiledCursor
(query
.end_compiled_cursor())
1175 end_cursor_position
= self
._GetCursorOffset
(results
, end_cursor
)
1177 end_cursor_position
= 0
1179 end_cursor_position
= len(results
)
1182 results
= results
[start_cursor_position
:end_cursor_position
]
1185 if query
.has_limit():
1186 limit
= query
.limit()
1188 limit
+= query
.offset()
1189 if limit
>= 0 and limit
< len(results
):
1190 results
= results
[:limit
]
1192 self
.__results
= results
1194 self
.__count
= len(self
.__results
)
1196 def _GetCursorOffset(self
, results
, cursor
):
1197 """Converts a cursor into a offset into the result set even if the
1198 cursor's entity no longer exists.
1201 results: the query's results (sequence of entity_pb.EntityProto)
1202 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1209 mid
= (lo
+ hi
) // 2
1210 if self
._IsBeforeCursor
(results
[mid
], cursor
):
1216 def PopulateQueryResult(self
, result
, count
, offset
,
1217 compile=False, first_result
=False):
1218 """Populates a QueryResult with this cursor and the given number of results.
1221 result: datastore_pb.QueryResult
1222 count: integer of how many results to return
1223 offset: integer of how many results to skip
1224 compile: boolean, whether we are compiling this query
1225 first_result: whether the query result is the first for this query
1227 Check(offset
>= 0, 'Offset must be >= 0')
1229 offset
= min(offset
, self
.__count
- self
.__offset
)
1230 limited_offset
= min(offset
, _MAX_QUERY_OFFSET
)
1232 self
.__offset
+= limited_offset
1233 result
.set_skipped_results(limited_offset
)
1235 if compile and result
.skipped_results() > 0:
1236 self
._EncodeCompiledCursor
(self
.__results
[self
.__offset
- 1],
1237 result
.mutable_skipped_results_compiled_cursor())
1238 if offset
== limited_offset
and count
:
1240 if count
> _MAXIMUM_RESULTS
:
1241 count
= _MAXIMUM_RESULTS
1242 results
= self
.__results
[self
.__offset
:self
.__offset
+ count
]
1243 count
= len(results
)
1244 self
.__offset
+= count
1250 result
.result_list().extend(
1251 LoadEntity(entity
, self
.keys_only
, self
.property_names
)
1252 for entity
in results
)
1254 for entity
in results
:
1255 self
._EncodeCompiledCursor
(entity
,
1256 result
.add_result_compiled_cursor())
1260 self
.__last
_result
= self
.__results
[self
.__offset
- 1]
1262 result
.set_more_results(self
.__offset
< self
.__count
)
1263 self
._PopulateResultMetadata
(result
, compile,
1264 first_result
, self
.__last
_result
)
1267 def _SynchronizeTxn(function
):
1268 """A decorator that locks a transaction during the function call."""
1270 def sync(txn
, *args
, **kwargs
):
1275 Check(txn
._state
is LiveTxn
.ACTIVE
, 'transaction closed')
1277 return function(txn
, *args
, **kwargs
)
1284 def _GetEntityGroup(ref
):
1285 """Returns the entity group key for the given reference."""
1286 entity_group
= entity_pb
.Reference()
1287 entity_group
.CopyFrom(ref
)
1288 assert (entity_group
.path().element_list()[0].has_id() or
1289 entity_group
.path().element_list()[0].has_name())
1290 del entity_group
.path().element_list()[1:]
1294 def _GetKeyKind(key
):
1295 """Return the kind of the given key."""
1296 return key
.path().element_list()[-1].type()
1299 def _FilterIndexesByKind(key
, indexes
):
1300 """Return only the indexes with the specified kind."""
1301 return filter((lambda index
:
1302 index
.definition().entity_type() == _GetKeyKind(key
)), indexes
)
1305 class LiveTxn(object):
1306 """An in flight transaction."""
1331 _commit_time_s
= None
1333 def __init__(self
, txn_manager
, app
, allow_multiple_eg
):
1334 assert isinstance(txn_manager
, BaseTransactionManager
)
1335 assert isinstance(app
, basestring
)
1337 self
._txn
_manager
= txn_manager
1339 self
._allow
_multiple
_eg
= allow_multiple_eg
1342 self
._entity
_groups
= {}
1344 self
._lock
= threading
.RLock()
1345 self
._apply
_lock
= threading
.Lock()
1348 self
._cost
= datastore_pb
.Cost()
1354 self
._kind
_to
_indexes
= collections
.defaultdict(list)
1356 def _GetTracker(self
, reference
):
1357 """Gets the entity group tracker for reference.
1359 If this is the first time reference's entity group is seen, creates a new
1360 tracker, checking that the transaction doesn't exceed the entity group
1363 entity_group
= _GetEntityGroup(reference
)
1364 key
= datastore_types
.ReferenceToKeyValue(entity_group
)
1365 tracker
= self
._entity
_groups
.get(key
, None)
1367 Check(self
._app
== reference
.app(),
1368 'Transactions cannot span applications (expected %s, got %s)' %
1369 (self
._app
, reference
.app()))
1370 if self
._allow
_multiple
_eg
:
1371 Check(len(self
._entity
_groups
) < _MAX_EG_PER_TXN
,
1372 'operating on too many entity groups in a single transaction.')
1374 Check(len(self
._entity
_groups
) < 1,
1375 "cross-groups transaction need to be explicitly "
1376 "specified (xg=True)")
1377 tracker
= EntityGroupTracker(entity_group
)
1378 self
._entity
_groups
[key
] = tracker
1382 def _GetAllTrackers(self
):
1383 """Get the trackers for the transaction's entity groups.
1385 If no entity group has been discovered returns a 'global' entity group
1386 tracker. This is possible if the txn only contains transactional tasks.
1389 The tracker list for the entity groups used in this txn.
1391 if not self
._entity
_groups
:
1392 self
._GetTracker
(datastore_types
.Key
.from_path(
1393 '__global__', 1, _app
=self
._app
)._ToPb
())
1394 return self
._entity
_groups
.values()
1396 def _GrabSnapshot(self
, reference
):
1397 """Gets snapshot for this reference, creating it if necessary.
1399 If no snapshot has been set for reference's entity group, a snapshot is
1400 taken and stored for future reads (this also sets the read position),
1401 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1402 a consistent snapshot.
1405 reference: A entity_pb.Reference from which to extract the entity group.
1407 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1409 tracker
= self
._GetTracker
(reference
)
1410 check_contention
= tracker
._snapshot
is None
1411 snapshot
= tracker
._GrabSnapshot
(self
._txn
_manager
)
1412 if check_contention
:
1418 candidates
= [other
for other
in self
._entity
_groups
.values()
1419 if other
._snapshot
is not None and other
!= tracker
]
1420 meta_data_list
= [other
._meta
_data
for other
in candidates
]
1421 self
._txn
_manager
._AcquireWriteLocks
(meta_data_list
)
1423 for other
in candidates
:
1424 if other
._meta
_data
._log
_pos
!= other
._read
_pos
:
1425 self
._state
= self
.FAILED
1426 raise apiproxy_errors
.ApplicationError(
1427 datastore_pb
.Error
.CONCURRENT_TRANSACTION
,
1428 'Concurrency exception.')
1430 self
._txn
_manager
._ReleaseWriteLocks
(meta_data_list
)
1434 def Get(self
, reference
):
1435 """Returns the entity associated with the given entity_pb.Reference or None.
1437 Does not see any modifications in the current txn.
1440 reference: The entity_pb.Reference of the entity to look up.
1443 The associated entity_pb.EntityProto or None if no such entity exists.
1445 snapshot
= self
._GrabSnapshot
(reference
)
1446 entity
= snapshot
.get(datastore_types
.ReferenceToKeyValue(reference
))
1447 return LoadEntity(entity
)
1450 def GetQueryCursor(self
, query
, filters
, orders
, index_list
,
1451 filter_predicate
=None):
1452 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1454 Does not see any modifications in the current txn.
1457 query: The datastore_pb.Query to run.
1458 filters: A list of filters that override the ones found on query.
1459 orders: A list of orders that override the ones found on query.
1460 index_list: A list of indexes used by the query.
1461 filter_predicate: an additional filter of type
1462 datastore_query.FilterPredicate. This is passed along to implement V4
1463 specific filters without changing the entire stub.
1466 A BaseCursor that can be used to fetch query results.
1468 Check(query
.has_ancestor(),
1469 'Query must have an ancestor when performed in a transaction.')
1470 snapshot
= self
._GrabSnapshot
(query
.ancestor())
1471 return _ExecuteQuery(snapshot
.values(), query
, filters
, orders
, index_list
,
1475 def Put(self
, entity
, insert
, indexes
):
1476 """Puts the given entity.
1479 entity: The entity_pb.EntityProto to put.
1480 insert: A boolean that indicates if we should fail if the entity already
1482 indexes: The composite indexes that apply to the entity.
1484 tracker
= self
._GetTracker
(entity
.key())
1485 key
= datastore_types
.ReferenceToKeyValue(entity
.key())
1486 tracker
._delete
.pop(key
, None)
1487 tracker
._put
[key
] = (entity
, insert
)
1488 self
._kind
_to
_indexes
[_GetKeyKind(entity
.key())] = indexes
1491 def Delete(self
, reference
, indexes
):
1492 """Deletes the entity associated with the given reference.
1495 reference: The entity_pb.Reference of the entity to delete.
1496 indexes: The composite indexes that apply to the entity.
1498 tracker
= self
._GetTracker
(reference
)
1499 key
= datastore_types
.ReferenceToKeyValue(reference
)
1500 tracker
._put
.pop(key
, None)
1501 tracker
._delete
[key
] = reference
1502 self
._kind
_to
_indexes
[_GetKeyKind(reference
)] = indexes
1505 def AddActions(self
, actions
, max_actions
=None):
1506 """Adds the given actions to the current txn.
1509 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1510 max_actions: A number that indicates the maximum number of actions to
1513 Check(not max_actions
or len(self
._actions
) + len(actions
) <= max_actions
,
1514 'Too many messages, maximum allowed %s' % max_actions
)
1515 self
._actions
.extend(actions
)
1518 """Rollback the current txn."""
1520 self
._lock
.acquire()
1522 Check(self
._state
is self
.ACTIVE
or self
._state
is self
.FAILED
,
1523 'transaction closed')
1524 self
._state
= self
.ROLLEDBACK
1526 self
._txn
_manager
._RemoveTxn
(self
)
1528 self
._lock
.release()
1532 """Commits the current txn.
1534 This function hands off the responsibility of calling _Apply to the owning
1538 The cost of the transaction.
1542 trackers
= self
._GetAllTrackers
()
1544 for tracker
in trackers
:
1545 snapshot
= tracker
._GrabSnapshot
(self
._txn
_manager
)
1546 empty
= empty
and not tracker
._put
and not tracker
._delete
1549 for entity
, insert
in tracker
._put
.itervalues():
1550 Check(not insert
or self
.Get(entity
.key()) is None,
1551 'the id allocated for a new entity was already '
1552 'in use, please try again')
1555 key
= datastore_types
.ReferenceToKeyValue(entity
.key())
1557 old_entity
= snapshot
[key
]
1558 self
._AddWriteOps
(old_entity
, entity
)
1560 for reference
in tracker
._delete
.itervalues():
1564 key
= datastore_types
.ReferenceToKeyValue(reference
)
1566 old_entity
= snapshot
[key
]
1567 if old_entity
is not None:
1568 self
._AddWriteOps
(None, old_entity
)
1571 if empty
and not self
._actions
:
1573 return datastore_pb
.Cost()
1576 meta_data_list
= [tracker
._meta
_data
for tracker
in trackers
]
1577 self
._txn
_manager
._AcquireWriteLocks
(meta_data_list
)
1585 for tracker
in trackers
:
1586 Check(tracker
._meta
_data
._log
_pos
== tracker
._read
_pos
,
1587 'Concurrency exception.',
1588 datastore_pb
.Error
.CONCURRENT_TRANSACTION
)
1591 for tracker
in trackers
:
1592 tracker
._meta
_data
.Log(self
)
1593 self
._state
= self
.COMMITED
1594 self
._commit
_time
_s
= time
.time()
1601 for action
in self
._actions
:
1603 apiproxy_stub_map
.MakeSyncCall(
1604 'taskqueue', 'Add', action
, api_base_pb
.VoidProto())
1605 except apiproxy_errors
.ApplicationError
, e
:
1606 logging
.warning('Transactional task %s has been dropped, %s',
1610 self
._txn
_manager
._RemoveTxn
(self
)
1612 self
._txn
_manager
._ReleaseWriteLocks
(meta_data_list
)
1615 self
._txn
_manager
._consistency
_policy
._OnCommit
(self
)
1618 def _AddWriteOps(self
, old_entity
, new_entity
):
1619 """Adds the cost of writing the new_entity to the _cost member.
1621 We assume that old_entity represents the current state of the Datastore.
1624 old_entity: Entity representing the current state in the Datstore.
1625 new_entity: Entity representing the desired state in the Datstore.
1627 composite_indexes
= self
._kind
_to
_indexes
[_GetKeyKind(new_entity
.key())]
1628 entity_writes
, index_writes
= _CalculateWriteOps(
1629 composite_indexes
, old_entity
, new_entity
)
1630 _UpdateCost(self
._cost
, entity_writes
, index_writes
)
1632 def _Apply(self
, meta_data
):
1633 """Applies the current txn on the given entity group.
1635 This function blindly performs the operations contained in the current txn.
1636 The calling function must acquire the entity group write lock and ensure
1637 transactions are applied in order.
1640 self
._apply
_lock
.acquire()
1643 assert self
._state
== self
.COMMITED
1644 for tracker
in self
._entity
_groups
.values():
1645 if tracker
._meta
_data
is meta_data
:
1649 assert tracker
._read
_pos
!= tracker
.APPLIED
1652 for entity
, insert
in tracker
._put
.itervalues():
1653 self
._txn
_manager
._Put
(entity
, insert
)
1656 for key
in tracker
._delete
.itervalues():
1657 self
._txn
_manager
._Delete
(key
)
1661 tracker
._read
_pos
= EntityGroupTracker
.APPLIED
1664 tracker
._meta
_data
.Unlog(self
)
1666 self
._apply
_lock
.release()
1669 class EntityGroupTracker(object):
1670 """An entity group involved a transaction."""
1686 def __init__(self
, entity_group
):
1687 self
._entity
_group
= entity_group
1691 def _GrabSnapshot(self
, txn_manager
):
1692 """Snapshot this entity group, remembering the read position."""
1693 if self
._snapshot
is None:
1694 self
._meta
_data
, self
._read
_pos
, self
._snapshot
= (
1695 txn_manager
._GrabSnapshot
(self
._entity
_group
))
1696 return self
._snapshot
1699 class EntityGroupMetaData(object):
1700 """The meta_data assoicated with an entity group."""
1707 def __init__(self
, entity_group
):
1708 self
._entity
_group
= entity_group
1709 self
._write
_lock
= threading
.Lock()
1710 self
._apply
_queue
= []
1713 """Applies all outstanding txns."""
1715 assert self
._write
_lock
.acquire(False) is False
1717 while self
._apply
_queue
:
1718 self
._apply
_queue
[0]._Apply
(self
)
1721 """Add a pending transaction to this entity group.
1723 Requires that the caller hold the meta data lock.
1724 This also increments the current log position and clears the snapshot cache.
1727 assert self
._write
_lock
.acquire(False) is False
1728 self
._apply
_queue
.append(txn
)
1730 self
._snapshot
= None
1732 def Unlog(self
, txn
):
1733 """Remove the first pending transaction from the apply queue.
1735 Requires that the caller hold the meta data lock.
1736 This checks that the first pending transaction is indeed txn.
1739 assert self
._write
_lock
.acquire(False) is False
1741 Check(self
._apply
_queue
and self
._apply
_queue
[0] is txn
,
1742 'Transaction is not appliable',
1743 datastore_pb
.Error
.INTERNAL_ERROR
)
1744 self
._apply
_queue
.pop(0)
1747 class BaseConsistencyPolicy(object):
1748 """A base class for a consistency policy to be used with a transaction manger.
1753 def _OnCommit(self
, txn
):
1754 """Called after a LiveTxn has been commited.
1756 This function can decide whether to apply the txn right away.
1759 txn: A LiveTxn that has been commited
1761 raise NotImplementedError
1763 def _OnGroom(self
, meta_data_list
):
1764 """Called once for every global query.
1766 This function must aqcuire the write lock for any meta data before applying
1767 any outstanding txns.
1770 meta_data_list: A list of EntityGroupMetaData objects.
1772 raise NotImplementedError
1775 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy
):
1776 """Enforces the Master / Slave consistency policy.
1778 Applies all txn on commit.
1781 def _OnCommit(self
, txn
):
1783 for tracker
in txn
._GetAllTrackers
():
1784 tracker
._meta
_data
._write
_lock
.acquire()
1786 tracker
._meta
_data
.CatchUp()
1788 tracker
._meta
_data
._write
_lock
.release()
1793 txn
._txn
_manager
.Write()
1795 def _OnGroom(self
, meta_data_list
):
1801 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy
):
1802 """A base class for High Replication Datastore consistency policies.
1804 All txn are applied asynchronously.
1807 def _OnCommit(self
, txn
):
1810 def _OnGroom(self
, meta_data_list
):
1813 for meta_data
in meta_data_list
:
1814 if not meta_data
._apply
_queue
:
1818 meta_data
._write
_lock
.acquire()
1820 while meta_data
._apply
_queue
:
1821 txn
= meta_data
._apply
_queue
[0]
1822 if self
._ShouldApply
(txn
, meta_data
):
1823 txn
._Apply
(meta_data
)
1827 meta_data
._write
_lock
.release()
1829 def _ShouldApply(self
, txn
, meta_data
):
1830 """Determins if the given transaction should be applied."""
1831 raise NotImplementedError
1834 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy
):
1835 """A High Replication Datastore consiseny policy based on elapsed time.
1837 This class tries to simulate performance seen in the high replication
1838 datastore using estimated probabilities of a transaction commiting after a
1839 given amount of time.
1842 _classification_map
= [(.98, 100),
1848 def SetClassificationMap(self
, classification_map
):
1849 """Set the probability a txn will be applied after a given amount of time.
1852 classification_map: A list of tuples containing (float between 0 and 1,
1853 number of miliseconds) that define the probability of a transaction
1854 applying after a given amount of time.
1856 for prob
, delay
in classification_map
:
1857 if prob
< 0 or prob
> 1 or delay
<= 0:
1859 'classification_map must be a list of (probability, delay) tuples, '
1860 'found %r' % (classification_map
,))
1862 self
._classification
_map
= sorted(classification_map
)
1864 def _ShouldApplyImpl(self
, elapsed_ms
, classification
):
1865 for rate
, ms
in self
._classification
_map
:
1866 if classification
<= rate
:
1868 return elapsed_ms
>= ms
1870 def _Classify(self
, txn
, meta_data
):
1871 return random
.Random(id(txn
) ^
id(meta_data
)).random()
1873 def _ShouldApply(self
, txn
, meta_data
):
1874 elapsed_ms
= (time
.time() - txn
._commit
_time
_s
) * 1000
1875 classification
= self
._Classify
(txn
, meta_data
)
1876 return self
._ShouldApplyImpl
(elapsed_ms
, classification
)
1879 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy
):
1880 """A policy that always gives the same sequence of consistency decisions."""
1882 def __init__(self
, probability
=.5, seed
=0):
1886 probability: A number between 0 and 1 that is the likelihood of a
1887 transaction applying before a global query is executed.
1888 seed: A hashable object to use as a seed. Use None to use the current
1891 self
.SetProbability(probability
)
1894 def SetProbability(self
, probability
):
1895 """Change the probability of a transaction applying.
1898 probability: A number between 0 and 1 that determins the probability of a
1899 transaction applying before a global query is run.
1901 if probability
< 0 or probability
> 1:
1902 raise TypeError('probability must be a number between 0 and 1, found %r' %
1904 self
._probability
= probability
1906 def SetSeed(self
, seed
):
1907 """Reset the seed."""
1908 self
._random
= random
.Random(seed
)
1910 def _ShouldApply(self
, txn
, meta_data
):
1911 return self
._random
.random() < self
._probability
1914 class BaseTransactionManager(object):
1915 """A class that manages the state of transactions.
1917 This includes creating consistent snap shots for transactions.
1920 def __init__(self
, consistency_policy
=None):
1921 super(BaseTransactionManager
, self
).__init
__()
1923 self
._consistency
_policy
= (consistency_policy
or
1924 MasterSlaveConsistencyPolicy())
1927 self
._meta
_data
_lock
= threading
.Lock()
1928 BaseTransactionManager
.Clear(self
)
1930 def SetConsistencyPolicy(self
, policy
):
1931 """Set the consistency to use.
1933 Causes all data to be flushed.
1936 policy: A obj inheriting from BaseConsistencyPolicy.
1938 if not isinstance(policy
, BaseConsistencyPolicy
):
1939 raise TypeError('policy should be of type '
1940 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1943 self
._consistency
_policy
= policy
1946 """Discards any pending transactions and resets the meta data."""
1948 self
._meta
_data
= {}
1952 def BeginTransaction(self
, app
, allow_multiple_eg
):
1953 """Start a transaction on the given app.
1956 app: A string representing the app for which to start the transaction.
1957 allow_multiple_eg: True if transactions can span multiple entity groups.
1960 A datastore_pb.Transaction for the created transaction
1962 Check(not (allow_multiple_eg
and isinstance(
1963 self
._consistency
_policy
, MasterSlaveConsistencyPolicy
)),
1964 'transactions on multiple entity groups only allowed with the '
1965 'High Replication datastore')
1966 txn
= self
._BeginTransaction
(app
, allow_multiple_eg
)
1967 self
._txn
_map
[id(txn
)] = txn
1968 transaction
= datastore_pb
.Transaction()
1969 transaction
.set_app(app
)
1970 transaction
.set_handle(id(txn
))
1973 def GetTxn(self
, transaction
, request_trusted
, request_app
):
1974 """Gets the LiveTxn object associated with the given transaction.
1977 transaction: The datastore_pb.Transaction to look up.
1978 request_trusted: A boolean indicating If the requesting app is trusted.
1979 request_app: A string representing the app making the request.
1982 The associated LiveTxn object.
1984 request_app
= datastore_types
.ResolveAppId(request_app
)
1985 CheckTransaction(request_trusted
, request_app
, transaction
)
1986 txn
= self
._txn
_map
.get(transaction
.handle())
1987 Check(txn
and txn
._app
== transaction
.app(),
1988 'Transaction(<%s>) not found' % str(transaction
).replace('\n', ', '))
1992 """Attempts to apply any outstanding transactions.
1994 The consistency policy determins if a transaction should be applied.
1996 self
._meta
_data
_lock
.acquire()
1998 self
._consistency
_policy
._OnGroom
(self
._meta
_data
.itervalues())
2000 self
._meta
_data
_lock
.release()
2003 """Applies all outstanding transactions."""
2004 self
._meta
_data
_lock
.acquire()
2006 for meta_data
in self
._meta
_data
.itervalues():
2007 if not meta_data
._apply
_queue
:
2011 meta_data
._write
_lock
.acquire()
2015 meta_data
._write
_lock
.release()
2017 self
._meta
_data
_lock
.release()
2019 def _GetMetaData(self
, entity_group
):
2020 """Safely gets the EntityGroupMetaData object for the given entity_group.
2022 self
._meta
_data
_lock
.acquire()
2024 key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2026 meta_data
= self
._meta
_data
.get(key
, None)
2028 meta_data
= EntityGroupMetaData(entity_group
)
2029 self
._meta
_data
[key
] = meta_data
2032 self
._meta
_data
_lock
.release()
2034 def _BeginTransaction(self
, app
, allow_multiple_eg
):
2035 """Starts a transaction without storing it in the txn_map."""
2036 return LiveTxn(self
, app
, allow_multiple_eg
)
2038 def _GrabSnapshot(self
, entity_group
):
2039 """Grabs a consistent snapshot of the given entity group.
2042 entity_group: A entity_pb.Reference of the entity group of which the
2043 snapshot should be taken.
2046 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2047 position and snapshot is a map of reference key value to
2048 entity_pb.EntityProto.
2051 meta_data
= self
._GetMetaData
(entity_group
)
2052 meta_data
._write
_lock
.acquire()
2054 if not meta_data
._snapshot
:
2057 meta_data
._snapshot
= self
._GetEntitiesInEntityGroup
(entity_group
)
2058 return meta_data
, meta_data
._log
_pos
, meta_data
._snapshot
2061 meta_data
._write
_lock
.release()
2063 def _AcquireWriteLocks(self
, meta_data_list
):
2064 """Acquire the write locks for the given entity group meta data.
2066 These locks must be released with _ReleaseWriteLock before returning to the
2070 meta_data_list: list of EntityGroupMetaData objects.
2072 for meta_data
in sorted(meta_data_list
):
2073 meta_data
._write
_lock
.acquire()
2075 def _ReleaseWriteLocks(self
, meta_data_list
):
2076 """Release the write locks of the given entity group meta data.
2079 meta_data_list: list of EntityGroupMetaData objects.
2081 for meta_data
in sorted(meta_data_list
):
2082 meta_data
._write
_lock
.release()
2084 def _RemoveTxn(self
, txn
):
2085 """Removes a LiveTxn from the txn_map (if present)."""
2086 self
._txn
_map
.pop(id(txn
), None)
2088 def _Put(self
, entity
, insert
):
2089 """Put the given entity.
2091 This must be implemented by a sub-class. The sub-class can assume that any
2092 need consistency is enforced at a higher level (and can just put blindly).
2095 entity: The entity_pb.EntityProto to put.
2096 insert: A boolean that indicates if we should fail if the entity already
2099 raise NotImplementedError
2101 def _Delete(self
, reference
):
2102 """Delete the entity associated with the specified reference.
2104 This must be implemented by a sub-class. The sub-class can assume that any
2105 need consistency is enforced at a higher level (and can just delete
2109 reference: The entity_pb.Reference of the entity to delete.
2111 raise NotImplementedError
2113 def _GetEntitiesInEntityGroup(self
, entity_group
):
2114 """Gets the contents of a specific entity group.
2116 This must be implemented by a sub-class. The sub-class can assume that any
2117 need consistency is enforced at a higher level (and can just blindly read).
2119 Other entity groups may be modified concurrently.
2122 entity_group: A entity_pb.Reference of the entity group to get.
2125 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2127 raise NotImplementedError
2130 class BaseIndexManager(object):
2131 """A generic index manager that stores all data in memory."""
2140 WRITE_ONLY
= entity_pb
.CompositeIndex
.WRITE_ONLY
2141 READ_WRITE
= entity_pb
.CompositeIndex
.READ_WRITE
2142 DELETED
= entity_pb
.CompositeIndex
.DELETED
2143 ERROR
= entity_pb
.CompositeIndex
.ERROR
2145 _INDEX_STATE_TRANSITIONS
= {
2146 WRITE_ONLY
: frozenset((READ_WRITE
, DELETED
, ERROR
)),
2147 READ_WRITE
: frozenset((DELETED
,)),
2148 ERROR
: frozenset((DELETED
,)),
2149 DELETED
: frozenset((ERROR
,)),
2156 self
.__indexes
= collections
.defaultdict(list)
2157 self
.__indexes
_lock
= threading
.Lock()
2158 self
.__next
_index
_id
= 1
2159 self
.__index
_id
_lock
= threading
.Lock()
2161 def __FindIndex(self
, index
):
2162 """Finds an existing index by definition.
2165 index: entity_pb.CompositeIndex
2168 entity_pb.CompositeIndex, if it exists; otherwise None
2170 app
= index
.app_id()
2171 if app
in self
.__indexes
:
2172 for stored_index
in self
.__indexes
[app
]:
2173 if index
.definition() == stored_index
.definition():
2178 def CreateIndex(self
, index
, trusted
=False, calling_app
=None):
2181 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2182 CheckAppId(trusted
, calling_app
, index
.app_id())
2183 Check(index
.id() == 0, 'New index id must be 0.')
2184 Check(not self
.__FindIndex
(index
), 'Index already exists.')
2187 self
.__index
_id
_lock
.acquire()
2188 index
.set_id(self
.__next
_index
_id
)
2189 self
.__next
_index
_id
+= 1
2190 self
.__index
_id
_lock
.release()
2193 clone
= entity_pb
.CompositeIndex()
2194 clone
.CopyFrom(index
)
2195 app
= index
.app_id()
2196 clone
.set_app_id(app
)
2199 self
.__indexes
_lock
.acquire()
2201 self
.__indexes
[app
].append(clone
)
2203 self
.__indexes
_lock
.release()
2205 self
._OnIndexChange
(index
.app_id())
2209 def GetIndexes(self
, app
, trusted
=False, calling_app
=None):
2210 """Get the CompositeIndex objects for the given app."""
2211 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2212 CheckAppId(trusted
, calling_app
, app
)
2214 return self
.__indexes
[app
]
2216 def UpdateIndex(self
, index
, trusted
=False, calling_app
=None):
2217 CheckAppId(trusted
, calling_app
, index
.app_id())
2219 stored_index
= self
.__FindIndex
(index
)
2220 Check(stored_index
, 'Index does not exist.')
2221 Check(index
.state() == stored_index
.state() or
2222 index
.state() in self
._INDEX
_STATE
_TRANSITIONS
[stored_index
.state()],
2223 'cannot move index state from %s to %s' %
2224 (entity_pb
.CompositeIndex
.State_Name(stored_index
.state()),
2225 (entity_pb
.CompositeIndex
.State_Name(index
.state()))))
2228 self
.__indexes
_lock
.acquire()
2230 stored_index
.set_state(index
.state())
2232 self
.__indexes
_lock
.release()
2234 self
._OnIndexChange
(index
.app_id())
2236 def DeleteIndex(self
, index
, trusted
=False, calling_app
=None):
2237 CheckAppId(trusted
, calling_app
, index
.app_id())
2239 stored_index
= self
.__FindIndex
(index
)
2240 Check(stored_index
, 'Index does not exist.')
2243 app
= index
.app_id()
2244 self
.__indexes
_lock
.acquire()
2246 self
.__indexes
[app
].remove(stored_index
)
2248 self
.__indexes
_lock
.release()
2250 self
._OnIndexChange
(index
.app_id())
2252 def _SideLoadIndex(self
, index
):
2253 self
.__indexes
[index
.app()].append(index
)
2255 def _OnIndexChange(self
, app_id
):
2259 class BaseDatastore(BaseTransactionManager
, BaseIndexManager
):
2260 """A base implemenation of a Datastore.
2262 This class implements common functions associated with a datastore and
2263 enforces security restrictions passed on by a stub or client. It is designed
2264 to be shared by any number of threads or clients serving any number of apps.
2266 If an app is not specified explicitly it is pulled from the env and assumed to
2272 _MAX_QUERY_COMPONENTS
= 100
2280 _MAX_ACTIONS_PER_TXN
= 5
2282 def __init__(self
, require_indexes
=False, consistency_policy
=None,
2283 use_atexit
=True, auto_id_policy
=SEQUENTIAL
):
2284 BaseTransactionManager
.__init
__(self
, consistency_policy
=consistency_policy
)
2285 BaseIndexManager
.__init
__(self
)
2287 self
._require
_indexes
= require_indexes
2288 self
._pseudo
_kinds
= {}
2289 self
.SetAutoIdPolicy(auto_id_policy
)
2296 atexit
.register(self
.Write
)
2299 """Clears out all stored values."""
2301 BaseTransactionManager
.Clear(self
)
2304 def _RegisterPseudoKind(self
, kind
):
2305 """Registers a pseudo kind to be used to satisfy a meta data query."""
2306 self
._pseudo
_kinds
[kind
.name
] = kind
2307 kind
._stub
= weakref
.proxy(self
)
2312 def GetQueryCursor(self
, raw_query
, trusted
=False, calling_app
=None,
2313 filter_predicate
=None):
2317 raw_query: The non-validated datastore_pb.Query to run.
2318 trusted: If the calling app is trusted.
2319 calling_app: The app requesting the results or None to pull the app from
2321 filter_predicate: an additional filter of type
2322 datastore_query.FilterPredicate. This is passed along to implement V4
2323 specific filters without changing the entire stub.
2326 A BaseCursor that can be used to retrieve results.
2329 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2330 CheckAppId(trusted
, calling_app
, raw_query
.app())
2333 filters
, orders
= datastore_index
.Normalize(raw_query
.filter_list(),
2334 raw_query
.order_list(),
2335 raw_query
.property_name_list())
2338 CheckQuery(raw_query
, filters
, orders
, self
._MAX
_QUERY
_COMPONENTS
)
2339 FillUsersInQuery(filters
)
2345 if filter_predicate
is None:
2346 self
._CheckHasIndex
(raw_query
, trusted
, calling_app
)
2349 index_list
= self
.__IndexListForQuery
(raw_query
)
2352 if raw_query
.has_transaction():
2354 Check(raw_query
.kind() not in self
._pseudo
_kinds
,
2355 'transactional queries on "%s" not allowed' % raw_query
.kind())
2356 txn
= self
.GetTxn(raw_query
.transaction(), trusted
, calling_app
)
2357 return txn
.GetQueryCursor(raw_query
, filters
, orders
, index_list
)
2359 if raw_query
.has_ancestor() and raw_query
.kind() not in self
._pseudo
_kinds
:
2361 txn
= self
._BeginTransaction
(raw_query
.app(), False)
2362 return txn
.GetQueryCursor(raw_query
, filters
, orders
, index_list
,
2367 return self
._GetQueryCursor
(raw_query
, filters
, orders
, index_list
,
2370 def __IndexListForQuery(self
, query
):
2371 """Get the single composite index pb used by the query, if any, as a list.
2374 query: the datastore_pb.Query to compute the index list for
2377 A singleton list of the composite index pb used by the query,
2380 required
, kind
, ancestor
, props
= (
2381 datastore_index
.CompositeIndexForQuery(query
))
2384 composite_index_pb
= entity_pb
.CompositeIndex()
2385 composite_index_pb
.set_app_id(query
.app())
2386 composite_index_pb
.set_id(0)
2387 composite_index_pb
.set_state(entity_pb
.CompositeIndex
.READ_WRITE
)
2388 index_pb
= composite_index_pb
.mutable_definition()
2389 index_pb
.set_entity_type(kind
)
2390 index_pb
.set_ancestor(bool(ancestor
))
2391 for name
, direction
in datastore_index
.GetRecommendedIndexProperties(props
):
2392 prop_pb
= entity_pb
.Index_Property()
2393 prop_pb
.set_name(name
)
2394 prop_pb
.set_direction(direction
)
2395 index_pb
.property_list().append(prop_pb
)
2396 return [composite_index_pb
]
2398 def Get(self
, raw_keys
, transaction
=None, eventual_consistency
=False,
2399 trusted
=False, calling_app
=None):
2400 """Get the entities for the given keys.
2403 raw_keys: A list of unverified entity_pb.Reference objects.
2404 transaction: The datastore_pb.Transaction to use or None.
2405 eventual_consistency: If we should allow stale, potentially inconsistent
2407 trusted: If the calling app is trusted.
2408 calling_app: The app requesting the results or None to pull the app from
2412 A list containing the entity or None if no entity exists.
2418 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2420 if not transaction
and eventual_consistency
:
2423 for key
in raw_keys
:
2424 CheckReference(calling_app
, trusted
, key
)
2425 result
.append(self
._GetWithPseudoKinds
(None, key
))
2431 grouped_keys
= collections
.defaultdict(list)
2432 for i
, key
in enumerate(raw_keys
):
2433 CheckReference(trusted
, calling_app
, key
)
2434 entity_group
= _GetEntityGroup(key
)
2435 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2436 grouped_keys
[entity_group_key
].append((key
, i
))
2440 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2441 return [self
._GetWithPseudoKinds
(txn
, key
) for key
in raw_keys
]
2445 result
= [None] * len(raw_keys
)
2449 result
[i
] = self
._GetWithPseudoKinds
(txn
, key
)
2450 for keys
in grouped_keys
.itervalues():
2451 self
._RunInTxn
(keys
, keys
[0][0].app(), op
)
2454 def _GetWithPseudoKinds(self
, txn
, key
):
2455 """Fetch entity key in txn, taking account of pseudo-kinds."""
2456 pseudo_kind
= self
._pseudo
_kinds
.get(_GetKeyKind(key
), None)
2458 return pseudo_kind
.Get(txn
, key
)
2462 return self
._Get
(key
)
2464 def Put(self
, raw_entities
, cost
, transaction
=None,
2465 trusted
=False, calling_app
=None):
2466 """Writes the given given entities.
2468 Updates an entity's key and entity_group in place if needed
2471 raw_entities: A list of unverified entity_pb.EntityProto objects.
2472 cost: Out param. The cost of putting the provided entities.
2473 transaction: The datastore_pb.Transaction to use or None.
2474 trusted: If the calling app is trusted.
2475 calling_app: The app requesting the results or None to pull the app from
2478 A list of entity_pb.Reference objects that indicates where each entity
2481 if not raw_entities
:
2484 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2487 result
= [None] * len(raw_entities
)
2488 grouped_entities
= collections
.defaultdict(list)
2489 for i
, raw_entity
in enumerate(raw_entities
):
2490 CheckEntity(trusted
, calling_app
, raw_entity
)
2494 entity
= entity_pb
.EntityProto()
2495 entity
.CopyFrom(raw_entity
)
2498 for prop
in itertools
.chain(entity
.property_list(),
2499 entity
.raw_property_list()):
2502 last_element
= entity
.key().path().element_list()[-1]
2503 if not (last_element
.id() or last_element
.has_name()):
2507 if self
._auto
_id
_policy
== SEQUENTIAL
:
2508 last_element
.set_id(self
._AllocateSequentialIds
(entity
.key())[0])
2510 full_key
= self
._AllocateIds
([entity
.key()])[0]
2511 last_element
.set_id(full_key
.path().element_list()[-1].id())
2515 entity_group
= _GetEntityGroup(entity
.key())
2516 entity
.mutable_entity_group().CopyFrom(entity_group
.path())
2517 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2518 grouped_entities
[entity_group_key
].append((entity
, insert
))
2522 key
= entity_pb
.Reference()
2523 key
.CopyFrom(entity
.key())
2528 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2529 for group
in grouped_entities
.values():
2530 for entity
, insert
in group
:
2532 indexes
= _FilterIndexesByKind(entity
.key(), self
.GetIndexes(
2533 entity
.key().app(), trusted
, calling_app
))
2534 txn
.Put(entity
, insert
, indexes
)
2537 for entities
in grouped_entities
.itervalues():
2538 txn_cost
= self
._RunInTxn
(
2539 entities
, entities
[0][0].key().app(),
2541 lambda txn
, v
: txn
.Put(v
[0], v
[1], _FilterIndexesByKind(
2543 self
.GetIndexes(v
[0].key().app(), trusted
, calling_app
))))
2544 _UpdateCost(cost
, txn_cost
.entity_writes(), txn_cost
.index_writes())
2547 def Delete(self
, raw_keys
, cost
, transaction
=None,
2548 trusted
=False, calling_app
=None):
2549 """Deletes the entities associated with the given keys.
2552 raw_keys: A list of unverified entity_pb.Reference objects.
2553 cost: Out param. The cost of putting the provided entities.
2554 transaction: The datastore_pb.Transaction to use or None.
2555 trusted: If the calling app is trusted.
2556 calling_app: The app requesting the results or None to pull the app from
2562 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2565 grouped_keys
= collections
.defaultdict(list)
2566 for key
in raw_keys
:
2567 CheckReference(trusted
, calling_app
, key
)
2568 entity_group
= _GetEntityGroup(key
)
2569 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2570 grouped_keys
[entity_group_key
].append(key
)
2574 txn
= self
.GetTxn(transaction
, trusted
, calling_app
)
2575 for key
in raw_keys
:
2577 indexes
= _FilterIndexesByKind(key
, self
.GetIndexes(
2578 key
.app(), trusted
, calling_app
))
2579 txn
.Delete(key
, indexes
)
2582 for keys
in grouped_keys
.itervalues():
2584 txn_cost
= self
._RunInTxn
(
2585 keys
, keys
[0].app(),
2586 lambda txn
, key
: txn
.Delete(key
, _FilterIndexesByKind(
2587 key
, self
.GetIndexes(key
.app(), trusted
, calling_app
))))
2588 _UpdateCost(cost
, txn_cost
.entity_writes(), txn_cost
.index_writes())
2590 def Touch(self
, raw_keys
, trusted
=False, calling_app
=None):
2591 """Applies all outstanding writes."""
2592 calling_app
= datastore_types
.ResolveAppId(calling_app
)
2594 grouped_keys
= collections
.defaultdict(list)
2595 for key
in raw_keys
:
2596 CheckReference(trusted
, calling_app
, key
)
2597 entity_group
= _GetEntityGroup(key
)
2598 entity_group_key
= datastore_types
.ReferenceToKeyValue(entity_group
)
2599 grouped_keys
[entity_group_key
].append(key
)
2601 for keys
in grouped_keys
.itervalues():
2602 self
._RunInTxn
(keys
, keys
[0].app(), lambda txn
, key
: None)
2604 def _RunInTxn(self
, values
, app
, op
):
2605 """Runs the given values in a separate Txn.
2607 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2610 values: A list of arguments to op.
2611 app: The app to create the Txn on.
2612 op: A function to run on each value in the Txn.
2615 The cost of the txn.
2618 backoff
= _INITIAL_RETRY_DELAY_MS
/ 1000.0
2621 txn
= self
._BeginTransaction
(app
, False)
2622 for value
in values
:
2625 except apiproxy_errors
.ApplicationError
, e
:
2626 if e
.application_error
== datastore_pb
.Error
.CONCURRENT_TRANSACTION
:
2629 if retries
<= _RETRIES
:
2631 backoff
*= _RETRY_DELAY_MULTIPLIER
2632 if backoff
* 1000.0 > _MAX_RETRY_DELAY_MS
:
2633 backoff
= _MAX_RETRY_DELAY_MS
/ 1000.0
2637 def _CheckHasIndex(self
, query
, trusted
=False, calling_app
=None):
2638 """Checks if the query can be satisfied given the existing indexes.
2641 query: the datastore_pb.Query to check
2642 trusted: True if the calling app is trusted (like dev_admin_console)
2643 calling_app: app_id of the current running application
2645 if query
.kind() in self
._pseudo
_kinds
or not self
._require
_indexes
:
2648 minimal_index
= datastore_index
.MinimalCompositeIndexForQuery(query
,
2649 (datastore_index
.ProtoToIndexDefinition(index
)
2650 for index
in self
.GetIndexes(query
.app(), trusted
, calling_app
)
2651 if index
.state() == entity_pb
.CompositeIndex
.READ_WRITE
))
2652 if minimal_index
is not None:
2653 msg
= ('This query requires a composite index that is not defined. '
2654 'You must update the index.yaml file in your application root.')
2655 is_most_efficient
, kind
, ancestor
, properties
= minimal_index
2656 if not is_most_efficient
:
2658 yaml
= datastore_index
.IndexYamlForQuery(kind
, ancestor
,
2659 datastore_index
.GetRecommendedIndexProperties(properties
))
2660 msg
+= '\nThe following index is the minimum index required:\n' + yaml
2661 raise apiproxy_errors
.ApplicationError(datastore_pb
.Error
.NEED_INDEX
, msg
)
2663 def SetAutoIdPolicy(self
, auto_id_policy
):
2664 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2666 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2667 and the default will be SCATTERED.
2670 auto_id_policy: string constant.
2672 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2674 valid_policies
= (SEQUENTIAL
, SCATTERED
)
2675 if auto_id_policy
not in valid_policies
:
2676 raise TypeError('auto_id_policy must be in %s, found %s instead',
2677 valid_policies
, auto_id_policy
)
2678 self
._auto
_id
_policy
= auto_id_policy
2683 """Writes the datastore to disk."""
2686 def _GetQueryCursor(self
, query
, filters
, orders
, index_list
,
2688 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2690 This must be implemented by a sub-class. The sub-class does not need to
2691 enforced any consistency guarantees (and can just blindly read).
2694 query: The datastore_pb.Query to run.
2695 filters: A list of filters that override the ones found on query.
2696 orders: A list of orders that override the ones found on query.
2697 index_list: A list of indexes used by the query.
2698 filter_predicate: an additional filter of type
2699 datastore_query.FilterPredicate. This is passed along to implement V4
2700 specific filters without changing the entire stub.
2703 A BaseCursor that can be used to fetch query results.
2705 raise NotImplementedError
2707 def _Get(self
, reference
):
2708 """Get the entity for the given reference or None.
2710 This must be implemented by a sub-class. The sub-class does not need to
2711 enforced any consistency guarantees (and can just blindly read).
2714 reference: A entity_pb.Reference to loop up.
2717 The entity_pb.EntityProto associated with the given reference or None.
2719 raise NotImplementedError
2721 def _AllocateSequentialIds(self
, reference
, size
=1, max_id
=None):
2722 """Allocate sequential ids for given reference.
2725 reference: An entity_pb.Reference to allocate an id for.
2726 size: The size of the range to allocate
2727 max_id: The upper bound of the range to allocate
2730 A tuple containing (min, max) of the allocated range.
2732 raise NotImplementedError
2734 def _AllocateIds(self
, references
):
2735 """Allocate or reserves IDs for the v4 datastore API.
2737 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2738 their paths reserved in the appropriate ID space.
2741 references: a list of entity_pb.Reference objects to allocate or reserve
2744 a list of complete entity_pb.Reference objects corresponding to the
2745 incomplete keys in the input, with newly allocated ids.
2747 raise NotImplementedError
2750 def _NeedsIndexes(func
):
2751 """A decorator for DatastoreStub methods that require or affect indexes.
2753 Updates indexes to match index.yaml before the call and updates index.yaml
2754 after the call if require_indexes is False. If root_path is not set, this is a
2758 def UpdateIndexesWrapper(self
, *args
, **kwargs
):
2759 self
._SetupIndexes
()
2761 return func(self
, *args
, **kwargs
)
2763 self
._UpdateIndexes
()
2765 return UpdateIndexesWrapper
2768 class EntityGroupPseudoKind(object):
2769 """A common implementation of get() for the __entity_group__ pseudo-kind.
2772 name: the pseudo-kind name
2774 name
= '__entity_group__'
2784 base_version
= int(time
.time() * 1e6
)
2786 def Get(self
, txn
, key
):
2787 """Fetch key of this pseudo-kind within txn.
2790 txn: transaction within which Get occurs, may be None if this is an
2791 eventually consistent Get.
2792 key: key of pseudo-entity to Get.
2795 An entity for key, or None if it doesn't exist.
2799 txn
= self
._stub
._BeginTransaction
(key
.app(), False)
2801 return self
.Get(txn
, key
)
2806 if isinstance(txn
._txn
_manager
._consistency
_policy
,
2807 MasterSlaveConsistencyPolicy
):
2816 if path
.element_size() != 2 or path
.element_list()[-1].id() != 1:
2819 tracker
= txn
._GetTracker
(key
)
2820 tracker
._GrabSnapshot
(txn
._txn
_manager
)
2822 eg
= entity_pb
.EntityProto()
2823 eg
.mutable_key().CopyFrom(key
)
2824 eg
.mutable_entity_group().CopyFrom(_GetEntityGroup(key
).path())
2825 version
= entity_pb
.Property()
2826 version
.set_name('__version__')
2827 version
.set_multiple(False)
2828 version
.mutable_value().set_int64value(
2829 tracker
._read
_pos
+ self
.base_version
)
2830 eg
.property_list().append(version
)
2833 def Query(self
, query
, filters
, orders
):
2834 """Perform a query on this pseudo-kind.
2837 query: the original datastore_pb.Query.
2838 filters: the filters from query.
2839 orders: the orders from query.
2842 always raises an error
2846 raise apiproxy_errors
.ApplicationError(
2847 datastore_pb
.Error
.BAD_REQUEST
, 'queries not supported on ' + self
.name
)
2850 class _CachedIndexDefinitions(object):
2851 """Records definitions read from index configuration files for later reuse.
2853 If the names and modification times of the configuration files are unchanged,
2854 then the index configurations previously parsed out of those files can be
2858 file_names: a list of the names of the configuration files. This will have
2859 one element when the configuration is based on an index.yaml but may have
2860 more than one if it is based on datastore-indexes.xml and
2861 datastore-indexes-auto.xml.
2862 last_modifieds: a list of floats that are the modification times of the
2863 files in file_names.
2864 index_protos: a list of entity_pb.CompositeIndex objects corresponding to
2865 the index definitions read from file_names.
2868 def __init__(self
, file_names
, last_modifieds
, index_protos
):
2870 assert len(file_names
) <= 1
2871 self
.file_names
= file_names
2872 self
.last_modifieds
= last_modifieds
2873 self
.index_protos
= index_protos
2876 class DatastoreStub(object):
2877 """A stub that maps datastore service calls on to a BaseDatastore.
2879 This class also keeps track of query cursors.
2887 super(DatastoreStub
, self
).__init
__()
2888 self
._datastore
= datastore
2889 self
._app
_id
= datastore_types
.ResolveAppId(app_id
)
2890 self
._trusted
= trusted
2891 self
._root
_path
= root_path
2894 self
.__query
_history
= {}
2897 self
.__query
_ci
_history
= set()
2901 self
._cached
_index
_definitions
= _CachedIndexDefinitions([], [], None)
2903 if self
._require
_indexes
or root_path
is None:
2905 self
._index
_yaml
_updater
= None
2908 self
._index
_yaml
_updater
= datastore_stub_index
.IndexYamlUpdater(
2911 DatastoreStub
.Clear(self
)
2914 """Clears out all stored values."""
2915 self
._query
_cursors
= {}
2916 self
.__query
_history
= {}
2917 self
.__query
_ci
_history
= set()
2919 def QueryHistory(self
):
2920 """Returns a dict that maps Query PBs to times they've been run."""
2922 return dict((pb
, times
) for pb
, times
in self
.__query
_history
.items()
2923 if pb
.app() == self
._app
_id
)
2925 def _QueryCompositeIndexHistoryLength(self
):
2926 """Returns the length of the CompositeIndex set for query history."""
2927 return len(self
.__query
_ci
_history
)
2929 def SetTrusted(self
, trusted
):
2930 """Set/clear the trusted bit in the stub.
2932 This bit indicates that the app calling the stub is trusted. A
2933 trusted app can write to datastores of other apps.
2938 self
._trusted
= trusted
2942 def _Dynamic_Get(self
, req
, res
):
2945 transaction
= req
.has_transaction() and req
.transaction() or None
2948 if req
.allow_deferred() and req
.key_size() > _MAXIMUM_RESULTS
:
2952 keys_to_get
= req
.key_list()[-_MAXIMUM_RESULTS
:]
2953 deferred_keys
= req
.key_list()[:-_MAXIMUM_RESULTS
]
2954 res
.deferred_list().extend(deferred_keys
)
2957 keys_to_get
= req
.key_list()
2959 res
.set_in_order(not req
.allow_deferred())
2961 total_response_bytes
= 0
2962 for index
, entity
in enumerate(self
._datastore
.Get(keys_to_get
,
2964 req
.has_failover_ms(),
2967 entity_size
= entity
and entity
.ByteSize() or 0
2970 if (req
.allow_deferred()
2972 and total_response_bytes
+ entity_size
> _MAXIMUM_QUERY_RESULT_BYTES
):
2974 res
.deferred_list().extend(keys_to_get
[index
:])
2977 entity_result
= res
.add_entity()
2978 entity_result
.mutable_entity().CopyFrom(entity
)
2979 total_response_bytes
+= entity_size
2982 entity_result
= res
.add_entity()
2983 entity_result
.mutable_key().CopyFrom(keys_to_get
[index
])
2985 def _Dynamic_Put(self
, req
, res
):
2986 transaction
= req
.has_transaction() and req
.transaction() or None
2987 res
.key_list().extend(self
._datastore
.Put(req
.entity_list(),
2990 self
._trusted
, self
._app
_id
))
2992 def _Dynamic_Delete(self
, req
, res
):
2993 transaction
= req
.has_transaction() and req
.transaction() or None
2994 self
._datastore
.Delete(req
.key_list(), res
.mutable_cost(), transaction
,
2995 self
._trusted
, self
._app
_id
)
2997 def _Dynamic_Touch(self
, req
, _
):
2998 self
._datastore
.Touch(req
.key_list(), self
._trusted
, self
._app
_id
)
3001 def _Dynamic_RunQuery(self
, query
, query_result
, filter_predicate
=None):
3002 self
.__UpgradeCursors
(query
)
3003 cursor
= self
._datastore
.GetQueryCursor(query
, self
._trusted
, self
._app
_id
,
3006 if query
.has_count():
3007 count
= query
.count()
3008 elif query
.has_limit():
3009 count
= query
.limit()
3011 count
= self
._BATCH
_SIZE
3013 cursor
.PopulateQueryResult(query_result
, count
, query
.offset(),
3014 query
.compile(), first_result
=True)
3015 if query_result
.has_cursor():
3016 self
._query
_cursors
[query_result
.cursor().cursor()] = cursor
3022 compiled_query
= query_result
.mutable_compiled_query()
3023 compiled_query
.set_keys_only(query
.keys_only())
3024 compiled_query
.mutable_primaryscan().set_index_name(query
.Encode())
3025 self
.__UpdateQueryHistory
(query
)
3027 def __UpgradeCursors(self
, query
):
3028 """Upgrades compiled cursors in place.
3030 If the cursor position does not specify before_ascending, populate it.
3031 If before_ascending is already populated, use it and the sort direction
3032 from the query to set an appropriate value for start_inclusive.
3035 query: datastore_pb.Query
3037 first_sort_direction
= None
3038 if query
.order_list():
3039 first_sort_direction
= query
.order(0).direction()
3041 for compiled_cursor
in [query
.compiled_cursor(),
3042 query
.end_compiled_cursor()]:
3043 self
.__UpgradeCursor
(compiled_cursor
, first_sort_direction
)
3045 def __UpgradeCursor(self
, compiled_cursor
, first_sort_direction
):
3046 """Upgrades a compiled cursor in place.
3048 If the cursor position does not specify before_ascending, populate it.
3049 If before_ascending is already populated, use it and the provided direction
3050 to set an appropriate value for start_inclusive.
3053 compiled_cursor: datastore_pb.CompiledCursor
3054 first_sort_direction: first sort direction from the query or None
3058 if not self
.__IsPlannable
(compiled_cursor
):
3060 elif compiled_cursor
.position().has_before_ascending():
3061 _SetStartInclusive(compiled_cursor
.position(), first_sort_direction
)
3062 elif compiled_cursor
.position().has_start_inclusive():
3063 _SetBeforeAscending(compiled_cursor
.position(), first_sort_direction
)
3065 def __IsPlannable(self
, compiled_cursor
):
3066 """Returns True if compiled_cursor is plannable.
3069 compiled_cursor: datastore_pb.CompiledCursor
3071 position
= compiled_cursor
.position()
3072 return position
.has_key() or position
.indexvalue_list()
3074 def __UpdateQueryHistory(self
, query
):
3076 clone
= datastore_pb
.Query()
3077 clone
.CopyFrom(query
)
3080 clone
.clear_offset()
3082 if clone
in self
.__query
_history
:
3083 self
.__query
_history
[clone
] += 1
3085 self
.__query
_history
[clone
] = 1
3086 if clone
.app() == self
._app
_id
:
3087 self
.__query
_ci
_history
.add(
3088 datastore_index
.CompositeIndexForQuery(clone
))
3090 def _Dynamic_Next(self
, next_request
, query_result
):
3091 app
= next_request
.cursor().app()
3092 CheckAppId(self
._trusted
, self
._app
_id
, app
)
3094 cursor
= self
._query
_cursors
.get(next_request
.cursor().cursor())
3095 Check(cursor
and cursor
.app
== app
,
3096 'Cursor %d not found' % next_request
.cursor().cursor())
3098 count
= self
._BATCH
_SIZE
3099 if next_request
.has_count():
3100 count
= next_request
.count()
3102 cursor
.PopulateQueryResult(query_result
, count
, next_request
.offset(),
3103 next_request
.compile(), first_result
=False)
3105 if not query_result
.has_cursor():
3106 del self
._query
_cursors
[next_request
.cursor().cursor()]
3108 def _Dynamic_AddActions(self
, request
, _
):
3109 """Associates the creation of one or more tasks with a transaction.
3112 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3113 tasks that should be created when the transaction is committed.
3119 if not request
.add_request_list():
3122 transaction
= request
.add_request_list()[0].transaction()
3123 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3125 for add_request
in request
.add_request_list():
3129 Check(add_request
.transaction() == transaction
,
3130 'Cannot add requests to different transactions')
3131 clone
= taskqueue_service_pb
.TaskQueueAddRequest()
3132 clone
.CopyFrom(add_request
)
3133 clone
.clear_transaction()
3134 new_actions
.append(clone
)
3136 txn
.AddActions(new_actions
, self
._MAX
_ACTIONS
_PER
_TXN
)
3138 def _Dynamic_BeginTransaction(self
, req
, transaction
):
3139 CheckAppId(self
._trusted
, self
._app
_id
, req
.app())
3140 transaction
.CopyFrom(self
._datastore
.BeginTransaction(
3141 req
.app(), req
.allow_multiple_eg()))
3143 def _Dynamic_Commit(self
, transaction
, res
):
3144 CheckAppId(self
._trusted
, self
._app
_id
, transaction
.app())
3145 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3146 res
.mutable_cost().CopyFrom(txn
.Commit())
3148 def _Dynamic_Rollback(self
, transaction
, _
):
3149 CheckAppId(self
._trusted
, self
._app
_id
, transaction
.app())
3150 txn
= self
._datastore
.GetTxn(transaction
, self
._trusted
, self
._app
_id
)
3153 def _Dynamic_CreateIndex(self
, index
, id_response
):
3154 id_response
.set_value(self
._datastore
.CreateIndex(index
,
3159 def _Dynamic_GetIndices(self
, app_str
, composite_indices
):
3160 composite_indices
.index_list().extend(self
._datastore
.GetIndexes(
3161 app_str
.value(), self
._trusted
, self
._app
_id
))
3163 def _Dynamic_UpdateIndex(self
, index
, _
):
3164 self
._datastore
.UpdateIndex(index
, self
._trusted
, self
._app
_id
)
3166 def _Dynamic_DeleteIndex(self
, index
, _
):
3167 self
._datastore
.DeleteIndex(index
, self
._trusted
, self
._app
_id
)
3169 def _Dynamic_AllocateIds(self
, allocate_ids_request
, allocate_ids_response
):
3170 Check(not allocate_ids_request
.has_model_key()
3171 or not allocate_ids_request
.reserve_list(),
3172 'Cannot allocate and reserve IDs in the same request')
3173 if allocate_ids_request
.reserve_list():
3174 Check(not allocate_ids_request
.has_size(),
3175 'Cannot specify size when reserving IDs')
3176 Check(not allocate_ids_request
.has_max(),
3177 'Cannot specify max when reserving IDs')
3179 if allocate_ids_request
.has_model_key():
3180 CheckAppId(allocate_ids_request
.model_key().app(),
3181 self
._trusted
, self
._app
_id
)
3183 reference
= allocate_ids_request
.model_key()
3185 (start
, end
) = self
._datastore
._AllocateSequentialIds
(
3186 reference
, allocate_ids_request
.size(), allocate_ids_request
.max())
3188 allocate_ids_response
.set_start(start
)
3189 allocate_ids_response
.set_end(end
)
3191 for reference
in allocate_ids_request
.reserve_list():
3192 CheckAppId(reference
.app(), self
._trusted
, self
._app
_id
)
3193 self
._datastore
._AllocateIds
(allocate_ids_request
.reserve_list())
3194 allocate_ids_response
.set_start(0)
3195 allocate_ids_response
.set_end(0)
3197 def _SetupIndexes(self
, _open
=open):
3198 """Ensure that the set of existing composite indexes matches index.yaml.
3200 Note: this is similar to the algorithm used by the admin console for
3206 if not self
._root
_path
:
3208 file_names
= [os
.path
.join(self
._root
_path
, 'index.yaml')]
3209 file_mtimes
= [os
.path
.getmtime(f
) for f
in file_names
if os
.path
.exists(f
)]
3210 if (self
._cached
_index
_definitions
.file_names
== file_names
and
3211 all(os
.path
.exists(f
) for f
in file_names
) and
3212 self
._cached
_index
_definitions
.last_modifieds
== file_mtimes
):
3213 requested_indexes
= self
._cached
_index
_definitions
.index_protos
3217 for file_name
in file_names
:
3219 file_mtimes
.append(os
.path
.getmtime(file_name
))
3220 with
_open(file_name
, 'r') as fh
:
3221 index_texts
.append(fh
.read())
3222 except (OSError, IOError):
3225 requested_indexes
= []
3226 if len(index_texts
) == len(file_names
):
3228 for index_text
in index_texts
:
3230 index_defs
= datastore_index
.ParseIndexDefinitions(index_text
)
3231 if index_defs
is None or index_defs
.indexes
is None:
3235 requested_indexes
.extend(
3236 datastore_index
.IndexDefinitionsToProtos(
3237 self
._app
_id
, index_defs
.indexes
))
3239 self
._cached
_index
_definitions
= _CachedIndexDefinitions(
3240 file_names
, file_mtimes
, requested_indexes
)
3243 existing_indexes
= self
._datastore
.GetIndexes(
3244 self
._app
_id
, self
._trusted
, self
._app
_id
)
3247 requested
= dict((x
.definition().Encode(), x
) for x
in requested_indexes
)
3248 existing
= dict((x
.definition().Encode(), x
) for x
in existing_indexes
)
3252 for key
, index
in requested
.iteritems():
3253 if key
not in existing
:
3254 new_index
= entity_pb
.CompositeIndex()
3255 new_index
.CopyFrom(index
)
3256 new_index
.set_id(datastore_admin
.CreateIndex(new_index
))
3257 new_index
.set_state(entity_pb
.CompositeIndex
.READ_WRITE
)
3258 datastore_admin
.UpdateIndex(new_index
)
3263 for key
, index
in existing
.iteritems():
3264 if key
not in requested
:
3265 datastore_admin
.DeleteIndex(index
)
3269 if created
or deleted
:
3270 logging
.debug('Created %d and deleted %d index(es); total %d',
3271 created
, deleted
, len(requested
))
3273 def _UpdateIndexes(self
):
3274 if self
._index
_yaml
_updater
is not None:
3275 self
._index
_yaml
_updater
.UpdateIndexYaml()
3278 class StubQueryConverter(object):
3279 """Converter for v3 and v4 queries suitable for use in stubs."""
3281 def __init__(self
, entity_converter
):
3282 self
._entity
_converter
= entity_converter
3284 def v4_to_v3_compiled_cursor(self
, v4_cursor
, v3_compiled_cursor
):
3285 """Converts a v4 cursor string to a v3 CompiledCursor.
3288 v4_cursor: a string representing a v4 query cursor
3289 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3291 v3_compiled_cursor
.Clear()
3293 v3_compiled_cursor
.ParseFromString(v4_cursor
)
3294 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3295 raise datastore_pbs
.InvalidConversionError('Invalid query cursor.')
3297 def v3_to_v4_compiled_cursor(self
, v3_compiled_cursor
):
3298 """Converts a v3 CompiledCursor to a v4 cursor string.
3301 v3_compiled_cursor: a datastore_pb.CompiledCursor
3304 a string representing a v4 query cursor
3306 return v3_compiled_cursor
.SerializeToString()
3308 def v4_to_v3_query(self
, v4_partition_id
, v4_query
, v3_query
):
3309 """Converts a v4 Query to a v3 Query.
3312 v4_partition_id: a datastore_v4_pb.PartitionId
3313 v4_query: a datastore_v4_pb.Query
3314 v3_query: a datastore_pb.Query to populate
3317 InvalidConversionError if the query cannot be converted
3321 if v4_partition_id
.dataset_id():
3322 v3_query
.set_app(v4_partition_id
.dataset_id())
3323 if v4_partition_id
.has_namespace():
3324 v3_query
.set_name_space(v4_partition_id
.namespace())
3326 v3_query
.set_persist_offset(True)
3327 v3_query
.set_require_perfect_plan(True)
3328 v3_query
.set_compile(True)
3331 if v4_query
.has_limit():
3332 v3_query
.set_limit(v4_query
.limit())
3333 if v4_query
.offset():
3334 v3_query
.set_offset(v4_query
.offset())
3335 if v4_query
.has_start_cursor():
3336 self
.v4_to_v3_compiled_cursor(v4_query
.start_cursor(),
3337 v3_query
.mutable_compiled_cursor())
3338 if v4_query
.has_end_cursor():
3339 self
.v4_to_v3_compiled_cursor(v4_query
.end_cursor(),
3340 v3_query
.mutable_end_compiled_cursor())
3343 if v4_query
.kind_list():
3344 datastore_pbs
.check_conversion(len(v4_query
.kind_list()) == 1,
3345 'multiple kinds not supported')
3346 v3_query
.set_kind(v4_query
.kind(0).name())
3349 has_key_projection
= False
3350 for prop
in v4_query
.projection_list():
3351 if prop
.property().name() == datastore_pbs
.PROPERTY_NAME_KEY
:
3352 has_key_projection
= True
3354 v3_query
.add_property_name(prop
.property().name())
3355 if has_key_projection
and not v3_query
.property_name_list():
3356 v3_query
.set_keys_only(True)
3359 for prop
in v4_query
.group_by_list():
3360 v3_query
.add_group_by_property_name(prop
.name())
3363 self
.__populate
_v
3_filters
(v4_query
.filter(), v3_query
)
3366 for v4_order
in v4_query
.order_list():
3367 v3_order
= v3_query
.add_order()
3368 v3_order
.set_property(v4_order
.property().name())
3369 if v4_order
.has_direction():
3370 v3_order
.set_direction(v4_order
.direction())
3372 def v3_to_v4_query(self
, v3_query
, v4_query
):
3373 """Converts a v3 Query to a v4 Query.
3376 v3_query: a datastore_pb.Query
3377 v4_query: a datastore_v4_pb.Query to populate
3380 InvalidConversionError if the query cannot be converted
3384 datastore_pbs
.check_conversion(not v3_query
.has_distinct(),
3385 'distinct option not supported')
3386 datastore_pbs
.check_conversion(v3_query
.require_perfect_plan(),
3387 'non-perfect plans not supported')
3391 if v3_query
.has_limit():
3392 v4_query
.set_limit(v3_query
.limit())
3393 if v3_query
.offset():
3394 v4_query
.set_offset(v3_query
.offset())
3395 if v3_query
.has_compiled_cursor():
3396 v4_query
.set_start_cursor(
3397 self
.v3_to_v4_compiled_cursor(v3_query
.compiled_cursor()))
3398 if v3_query
.has_end_compiled_cursor():
3399 v4_query
.set_end_cursor(
3400 self
.v3_to_v4_compiled_cursor(v3_query
.end_compiled_cursor()))
3403 if v3_query
.has_kind():
3404 v4_query
.add_kind().set_name(v3_query
.kind())
3407 for name
in v3_query
.property_name_list():
3408 v4_query
.add_projection().mutable_property().set_name(name
)
3409 if v3_query
.keys_only():
3410 v4_query
.add_projection().mutable_property().set_name(
3411 datastore_pbs
.PROPERTY_NAME_KEY
)
3414 for name
in v3_query
.group_by_property_name_list():
3415 v4_query
.add_group_by().set_name(name
)
3418 num_v4_filters
= len(v3_query
.filter_list())
3419 if v3_query
.has_ancestor():
3422 if num_v4_filters
== 1:
3423 get_property_filter
= self
.__get
_property
_filter
3424 elif num_v4_filters
>= 1:
3425 v4_query
.mutable_filter().mutable_composite_filter().set_operator(
3426 datastore_v4_pb
.CompositeFilter
.AND
)
3427 get_property_filter
= self
.__add
_property
_filter
3429 if v3_query
.has_ancestor():
3430 self
.__v
3_query
_to
_v
4_ancestor
_filter
(v3_query
,
3431 get_property_filter(v4_query
))
3432 for v3_filter
in v3_query
.filter_list():
3433 self
.__v
3_filter
_to
_v
4_property
_filter
(v3_filter
,
3434 get_property_filter(v4_query
))
3437 for v3_order
in v3_query
.order_list():
3438 v4_order
= v4_query
.add_order()
3439 v4_order
.mutable_property().set_name(v3_order
.property())
3440 if v3_order
.has_direction():
3441 v4_order
.set_direction(v3_order
.direction())
3443 def __get_property_filter(self
, v4_query
):
3444 """Returns the PropertyFilter from the query's top-level filter."""
3445 return v4_query
.mutable_filter().mutable_property_filter()
3447 def __add_property_filter(self
, v4_query
):
3448 """Adds and returns a PropertyFilter from the query's composite filter."""
3449 v4_comp_filter
= v4_query
.mutable_filter().mutable_composite_filter()
3450 return v4_comp_filter
.add_filter().mutable_property_filter()
3452 def __populate_v3_filters(self
, v4_filter
, v3_query
):
3453 """Populates a filters for a v3 Query.
3456 v4_filter: a datastore_v4_pb.Filter
3457 v3_query: a datastore_pb.Query to populate with filters
3460 datastore_pbs
.check_conversion(not v4_filter
.has_bounding_circle_filter(),
3461 'bounding circle filter not supported')
3462 datastore_pbs
.check_conversion(not v4_filter
.has_bounding_box_filter(),
3463 'bounding box filter not supported')
3465 if v4_filter
.has_property_filter():
3466 v4_property_filter
= v4_filter
.property_filter()
3467 if (v4_property_filter
.operator()
3468 == datastore_v4_pb
.PropertyFilter
.HAS_ANCESTOR
):
3469 datastore_pbs
.check_conversion(
3470 v4_property_filter
.value().has_key_value(),
3471 'HAS_ANCESTOR requires a reference value')
3472 datastore_pbs
.check_conversion((v4_property_filter
.property().name()
3473 == datastore_pbs
.PROPERTY_NAME_KEY
),
3474 'unsupported property')
3475 datastore_pbs
.check_conversion(not v3_query
.has_ancestor(),
3476 'duplicate ancestor constraint')
3477 self
._entity
_converter
.v4_to_v3_reference(
3478 v4_property_filter
.value().key_value(),
3479 v3_query
.mutable_ancestor())
3481 v3_filter
= v3_query
.add_filter()
3482 property_name
= v4_property_filter
.property().name()
3483 v3_filter
.set_op(v4_property_filter
.operator())
3484 datastore_pbs
.check_conversion(
3485 not v4_property_filter
.value().list_value_list(),
3486 ('unsupported value type, %s, in property filter'
3487 ' on "%s"' % ('list_value', property_name
)))
3488 prop
= v3_filter
.add_property()
3489 prop
.set_multiple(False)
3490 prop
.set_name(property_name
)
3491 self
._entity
_converter
.v4_value_to_v3_property_value(
3492 v4_property_filter
.value(), prop
.mutable_value())
3493 elif v4_filter
.has_composite_filter():
3494 datastore_pbs
.check_conversion((v4_filter
.composite_filter().operator()
3495 == datastore_v4_pb
.CompositeFilter
.AND
),
3496 'unsupported composite property operator')
3497 for v4_sub_filter
in v4_filter
.composite_filter().filter_list():
3498 self
.__populate
_v
3_filters
(v4_sub_filter
, v3_query
)
3500 def __v3_filter_to_v4_property_filter(self
, v3_filter
, v4_property_filter
):
3501 """Converts a v3 Filter to a v4 PropertyFilter.
3504 v3_filter: a datastore_pb.Filter
3505 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3508 InvalidConversionError if the filter cannot be converted
3510 datastore_pbs
.check_conversion(v3_filter
.property_size() == 1,
3512 datastore_pbs
.check_conversion(v3_filter
.op() <= 5,
3513 'unsupported filter op: %d' % v3_filter
.op())
3514 v4_property_filter
.Clear()
3515 v4_property_filter
.set_operator(v3_filter
.op())
3516 v4_property_filter
.mutable_property().set_name(v3_filter
.property(0).name())
3517 self
._entity
_converter
.v3_property_to_v4_value(
3518 v3_filter
.property(0), True, v4_property_filter
.mutable_value())
3520 def __v3_query_to_v4_ancestor_filter(self
, v3_query
, v4_property_filter
):
3521 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3524 v3_query: a datastore_pb.Query
3525 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3527 v4_property_filter
.Clear()
3528 v4_property_filter
.set_operator(
3529 datastore_v4_pb
.PropertyFilter
.HAS_ANCESTOR
)
3530 prop
= v4_property_filter
.mutable_property()
3531 prop
.set_name(datastore_pbs
.PROPERTY_NAME_KEY
)
3532 self
._entity
_converter
.v3_to_v4_key(
3533 v3_query
.ancestor(),
3534 v4_property_filter
.mutable_value().mutable_key_value())
3538 __query_converter
= StubQueryConverter(datastore_pbs
.get_entity_converter())
3541 def get_query_converter():
3542 """Returns a converter for v3 and v4 queries (not suitable for production).
3544 This converter is suitable for use in stubs but not for production.
3547 a StubQueryConverter
3549 return __query_converter
3552 class StubServiceConverter(object):
3553 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3555 def __init__(self
, entity_converter
, query_converter
):
3556 self
._entity
_converter
= entity_converter
3557 self
._query
_converter
= query_converter
3559 def v4_to_v3_cursor(self
, v4_query_handle
, v3_cursor
):
3560 """Converts a v4 cursor string to a v3 Cursor.
3563 v4_query_handle: a string representing a v4 query handle
3564 v3_cursor: a datastore_pb.Cursor to populate
3567 v3_cursor
.ParseFromString(v4_query_handle
)
3568 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3569 raise datastore_pbs
.InvalidConversionError('Invalid query handle.')
3572 def _v3_to_v4_query_handle(self
, v3_cursor
):
3573 """Converts a v3 Cursor to a v4 query handle string.
3576 v3_cursor: a datastore_pb.Cursor
3579 a string representing a v4 cursor
3581 return v3_cursor
.SerializeToString()
3583 def v4_to_v3_txn(self
, v4_txn
, v3_txn
):
3584 """Converts a v4 transaction string to a v3 Transaction.
3587 v4_txn: a string representing a v4 transaction
3588 v3_txn: a datastore_pb.Transaction to populate
3591 v3_txn
.ParseFromString(v4_txn
)
3592 except ProtocolBuffer
.ProtocolBufferDecodeError
:
3593 raise datastore_pbs
.InvalidConversionError('Invalid transaction.')
3596 def _v3_to_v4_txn(self
, v3_txn
):
3597 """Converts a v3 Transaction to a v4 transaction string.
3600 v3_txn: a datastore_pb.Transaction
3603 a string representing a v4 transaction
3605 return v3_txn
.SerializeToString()
3610 def v4_to_v3_begin_transaction_req(self
, app_id
, v4_req
):
3611 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3615 v4_req: a datastore_v4_pb.BeginTransactionRequest
3618 a datastore_pb.BeginTransactionRequest
3620 v3_req
= datastore_pb
.BeginTransactionRequest()
3621 v3_req
.set_app(app_id
)
3622 v3_req
.set_allow_multiple_eg(v4_req
.cross_group())
3625 def v3_to_v4_begin_transaction_resp(self
, v3_resp
):
3626 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3629 v3_resp: a datastore_pb.Transaction
3632 a datastore_v4_pb.BeginTransactionResponse
3634 v4_resp
= datastore_v4_pb
.BeginTransactionResponse()
3635 v4_resp
.set_transaction(self
._v
3_to
_v
4_txn
(v3_resp
))
3641 def v4_rollback_req_to_v3_txn(self
, v4_req
):
3642 """Converts a v4 RollbackRequest to a v3 Transaction.
3645 v4_req: a datastore_v4_pb.RollbackRequest
3648 a datastore_pb.Transaction
3650 v3_txn
= datastore_pb
.Transaction()
3651 self
.v4_to_v3_txn(v4_req
.transaction(), v3_txn
)
3657 def v4_commit_req_to_v3_txn(self
, v4_req
):
3658 """Converts a v4 CommitRequest to a v3 Transaction.
3661 v4_req: a datastore_v4_pb.CommitRequest
3664 a datastore_pb.Transaction
3666 v3_txn
= datastore_pb
.Transaction()
3667 self
.v4_to_v3_txn(v4_req
.transaction(), v3_txn
)
3673 def v4_run_query_req_to_v3_query(self
, v4_req
):
3674 """Converts a v4 RunQueryRequest to a v3 Query.
3676 GQL is not supported.
3679 v4_req: a datastore_v4_pb.RunQueryRequest
3682 a datastore_pb.Query
3685 datastore_pbs
.check_conversion(not v4_req
.has_gql_query(),
3686 'GQL not supported')
3687 v3_query
= datastore_pb
.Query()
3688 self
._query
_converter
.v4_to_v3_query(v4_req
.partition_id(), v4_req
.query(),
3692 if v4_req
.has_suggested_batch_size():
3693 v3_query
.set_count(v4_req
.suggested_batch_size())
3696 read_options
= v4_req
.read_options()
3697 if read_options
.has_transaction():
3698 self
.v4_to_v3_txn(read_options
.transaction(),
3699 v3_query
.mutable_transaction())
3700 elif (read_options
.read_consistency()
3701 == datastore_v4_pb
.ReadOptions
.EVENTUAL
):
3702 v3_query
.set_strong(False)
3703 v3_query
.set_failover_ms(-1)
3704 elif read_options
.read_consistency() == datastore_v4_pb
.ReadOptions
.STRONG
:
3705 v3_query
.set_strong(True)
3707 if v4_req
.has_min_safe_time_seconds():
3708 v3_query
.set_min_safe_time_seconds(v4_req
.min_safe_time_seconds())
3712 def v3_to_v4_run_query_req(self
, v3_req
):
3713 """Converts a v3 Query to a v4 RunQueryRequest.
3716 v3_req: a datastore_pb.Query
3719 a datastore_v4_pb.RunQueryRequest
3721 v4_req
= datastore_v4_pb
.RunQueryRequest()
3724 v4_partition_id
= v4_req
.mutable_partition_id()
3725 v4_partition_id
.set_dataset_id(v3_req
.app())
3726 if v3_req
.name_space():
3727 v4_partition_id
.set_namespace(v3_req
.name_space())
3730 if v3_req
.has_count():
3731 v4_req
.set_suggested_batch_size(v3_req
.count())
3733 datastore_pbs
.check_conversion(
3734 not (v3_req
.has_transaction() and v3_req
.has_failover_ms()),
3735 'Cannot set failover and transaction handle.')
3738 if v3_req
.has_transaction():
3739 v4_req
.mutable_read_options().set_transaction(
3740 self
._v
3_to
_v
4_txn
(v3_req
.transaction()))
3741 elif v3_req
.strong():
3742 v4_req
.mutable_read_options().set_read_consistency(
3743 datastore_v4_pb
.ReadOptions
.STRONG
)
3744 elif v3_req
.has_failover_ms():
3745 v4_req
.mutable_read_options().set_read_consistency(
3746 datastore_v4_pb
.ReadOptions
.EVENTUAL
)
3747 if v3_req
.has_min_safe_time_seconds():
3748 v4_req
.set_min_safe_time_seconds(v3_req
.min_safe_time_seconds())
3750 self
._query
_converter
.v3_to_v4_query(v3_req
, v4_req
.mutable_query())
3754 def v4_run_query_resp_to_v3_query_result(self
, v4_resp
):
3755 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3758 v4_resp: a datastore_v4_pb.QueryResult
3761 a datastore_pb.QueryResult
3763 v3_resp
= self
.v4_to_v3_query_result(v4_resp
.batch())
3766 if v4_resp
.has_query_handle():
3767 self
.v4_to_v3_cursor(v4_resp
.query_handle(), v3_resp
.mutable_cursor())
3771 def v3_to_v4_run_query_resp(self
, v3_resp
):
3772 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3775 v3_resp: a datastore_pb.QueryResult
3778 a datastore_v4_pb.RunQueryResponse
3780 v4_resp
= datastore_v4_pb
.RunQueryResponse()
3781 self
.v3_to_v4_query_result_batch(v3_resp
, v4_resp
.mutable_batch())
3783 if v3_resp
.has_cursor():
3784 v4_resp
.set_query_handle(
3785 self
._query
_converter
.v3_to_v4_compiled_cursor(v3_resp
.cursor()))
3792 def v4_to_v3_next_req(self
, v4_req
):
3793 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3796 v4_req: a datastore_v4_pb.ContinueQueryRequest
3799 a datastore_pb.NextRequest
3801 v3_req
= datastore_pb
.NextRequest()
3802 v3_req
.set_compile(True)
3803 self
.v4_to_v3_cursor(v4_req
.query_handle(), v3_req
.mutable_cursor())
3806 def v3_to_v4_continue_query_resp(self
, v3_resp
):
3807 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3810 v3_resp: a datstore_pb.QueryResult
3813 a datastore_v4_pb.ContinueQueryResponse
3815 v4_resp
= datastore_v4_pb
.ContinueQueryResponse()
3816 self
.v3_to_v4_query_result_batch(v3_resp
, v4_resp
.mutable_batch())
3822 def v4_to_v3_get_req(self
, v4_req
):
3823 """Converts a v4 LookupRequest to a v3 GetRequest.
3826 v4_req: a datastore_v4_pb.LookupRequest
3829 a datastore_pb.GetRequest
3831 v3_req
= datastore_pb
.GetRequest()
3832 v3_req
.set_allow_deferred(True)
3835 if v4_req
.read_options().has_transaction():
3836 self
.v4_to_v3_txn(v4_req
.read_options().transaction(),
3837 v3_req
.mutable_transaction())
3838 elif (v4_req
.read_options().read_consistency()
3839 == datastore_v4_pb
.ReadOptions
.EVENTUAL
):
3840 v3_req
.set_strong(False)
3841 v3_req
.set_failover_ms(-1)
3842 elif (v4_req
.read_options().read_consistency()
3843 == datastore_v4_pb
.ReadOptions
.STRONG
):
3844 v3_req
.set_strong(True)
3846 for v4_key
in v4_req
.key_list():
3847 self
._entity
_converter
.v4_to_v3_reference(v4_key
, v3_req
.add_key())
3851 def v3_to_v4_lookup_resp(self
, v3_resp
):
3852 """Converts a v3 GetResponse to a v4 LookupResponse.
3855 v3_resp: a datastore_pb.GetResponse
3858 a datastore_v4_pb.LookupResponse
3860 v4_resp
= datastore_v4_pb
.LookupResponse()
3862 for v3_ref
in v3_resp
.deferred_list():
3863 self
._entity
_converter
.v3_to_v4_key(v3_ref
, v4_resp
.add_deferred())
3864 for v3_entity
in v3_resp
.entity_list():
3865 if v3_entity
.has_entity():
3866 self
._entity
_converter
.v3_to_v4_entity(
3868 v4_resp
.add_found().mutable_entity())
3869 if v3_entity
.has_key():
3870 self
._entity
_converter
.v3_to_v4_key(
3872 v4_resp
.add_missing().mutable_entity().mutable_key())
3876 def v4_to_v3_query_result(self
, v4_batch
):
3877 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3880 v4_batch: a datastore_v4_pb.QueryResultBatch
3883 a datastore_pb.QueryResult
3885 v3_result
= datastore_pb
.QueryResult()
3888 v3_result
.set_more_results(
3889 (v4_batch
.more_results()
3890 == datastore_v4_pb
.QueryResultBatch
.NOT_FINISHED
))
3891 if v4_batch
.has_end_cursor():
3892 self
._query
_converter
.v4_to_v3_compiled_cursor(
3893 v4_batch
.end_cursor(), v3_result
.mutable_compiled_cursor())
3894 if v4_batch
.has_skipped_cursor():
3895 self
._query
_converter
.v4_to_v3_compiled_cursor(
3896 v4_batch
.skipped_cursor(),
3897 v3_result
.mutable_skipped_results_compiled_cursor())
3900 if v4_batch
.entity_result_type() == datastore_v4_pb
.EntityResult
.PROJECTION
:
3901 v3_result
.set_index_only(True)
3902 elif v4_batch
.entity_result_type() == datastore_v4_pb
.EntityResult
.KEY_ONLY
:
3903 v3_result
.set_keys_only(True)
3906 if v4_batch
.has_skipped_results():
3907 v3_result
.set_skipped_results(v4_batch
.skipped_results())
3908 for v4_entity
in v4_batch
.entity_result_list():
3909 v3_entity
= v3_result
.add_result()
3910 self
._entity
_converter
.v4_to_v3_entity(v4_entity
.entity(), v3_entity
)
3911 if v4_entity
.has_cursor():
3912 cursor
= v3_result
.add_result_compiled_cursor()
3913 self
._query
_converter
.v4_to_v3_compiled_cursor(v4_entity
.cursor(),
3915 if v4_batch
.entity_result_type() != datastore_v4_pb
.EntityResult
.FULL
:
3918 v3_entity
.clear_entity_group()
3922 def v3_to_v4_query_result_batch(self
, v3_result
, v4_batch
):
3923 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3926 v3_result: a datastore_pb.QueryResult
3927 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3932 if v3_result
.more_results():
3933 v4_batch
.set_more_results(datastore_v4_pb
.QueryResultBatch
.NOT_FINISHED
)
3935 v4_batch
.set_more_results(
3936 datastore_v4_pb
.QueryResultBatch
.MORE_RESULTS_AFTER_LIMIT
)
3937 if v3_result
.has_compiled_cursor():
3938 v4_batch
.set_end_cursor(
3939 self
._query
_converter
.v3_to_v4_compiled_cursor(
3940 v3_result
.compiled_cursor()))
3941 if v3_result
.has_skipped_results_compiled_cursor():
3942 v4_batch
.set_skipped_cursor(
3943 self
._query
_converter
.v3_to_v4_compiled_cursor(
3944 v3_result
.skipped_results_compiled_cursor()))
3947 if v3_result
.keys_only():
3948 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.KEY_ONLY
)
3949 elif v3_result
.index_only():
3950 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.PROJECTION
)
3952 v4_batch
.set_entity_result_type(datastore_v4_pb
.EntityResult
.FULL
)
3955 if v3_result
.has_skipped_results():
3956 v4_batch
.set_skipped_results(v3_result
.skipped_results())
3957 for v3_entity
, v3_cursor
in itertools
.izip_longest(
3958 v3_result
.result_list(),
3959 v3_result
.result_compiled_cursor_list()):
3960 v4_entity_result
= datastore_v4_pb
.EntityResult()
3961 self
._entity
_converter
.v3_to_v4_entity(v3_entity
,
3962 v4_entity_result
.mutable_entity())
3963 if v3_cursor
is not None:
3964 v4_entity_result
.set_cursor(
3965 self
._query
_converter
.v3_to_v4_compiled_cursor(v3_cursor
))
3966 v4_batch
.entity_result_list().append(v4_entity_result
)
3970 __service_converter
= StubServiceConverter(
3971 datastore_pbs
.get_entity_converter(), __query_converter
)
3974 def get_service_converter():
3975 """Returns a converter for v3 and v4 service request/response protos.
3977 This converter is suitable for use in stubs but not for production.
3980 a StubServiceConverter
3982 return __service_converter
3985 def ReverseBitsInt64(v
):
3986 """Reverse the bits of a 64-bit integer.
3989 v: Input integer of type 'int' or 'long'.
3992 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
3995 v
= ((v
>> 1) & 0x5555555555555555) |
((v
& 0x5555555555555555) << 1)
3996 v
= ((v
>> 2) & 0x3333333333333333) |
((v
& 0x3333333333333333) << 2)
3997 v
= ((v
>> 4) & 0x0F0F0F0F0F0F0F0F) |
((v
& 0x0F0F0F0F0F0F0F0F) << 4)
3998 v
= ((v
>> 8) & 0x00FF00FF00FF00FF) |
((v
& 0x00FF00FF00FF00FF) << 8)
3999 v
= ((v
>> 16) & 0x0000FFFF0000FFFF) |
((v
& 0x0000FFFF0000FFFF) << 16)
4000 v
= int((v
>> 32) |
(v
<< 32) & 0xFFFFFFFFFFFFFFFF)
4004 def ToScatteredId(v
):
4005 """Map counter value v to the scattered ID space.
4007 Translate to scattered ID space, then reverse bits.
4010 v: Counter value from which to produce ID.
4016 datastore_errors.BadArgumentError if counter value exceeds the range of
4017 the scattered ID space.
4019 if v
>= _MAX_SCATTERED_COUNTER
:
4020 raise datastore_errors
.BadArgumentError('counter value too large (%d)' %v
)
4021 return _MAX_SEQUENTIAL_ID
+ 1 + long(ReverseBitsInt64(v
<< _SCATTER_SHIFT
))
4025 """Map ID k to the counter value from which it was generated.
4027 Determine whether k is sequential or scattered ID.
4030 k: ID from which to infer counter value.
4033 Tuple of integers (counter_value, id_space).
4035 if k
> _MAX_SCATTERED_ID
:
4037 elif k
> _MAX_SEQUENTIAL_ID
and k
<= _MAX_SCATTERED_ID
:
4038 return long(ReverseBitsInt64(k
) >> _SCATTER_SHIFT
), SCATTERED
4040 return long(k
), SEQUENTIAL
4042 raise datastore_errors
.BadArgumentError('invalid id (%d)' % k
)
4045 def CompareEntityPbByKey(a
, b
):
4046 """Compare two entity protobuf's by key.
4049 a: entity_pb.EntityProto to compare
4050 b: entity_pb.EntityProto to compare
4052 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4054 return cmp(datastore_types
.Key
._FromPb
(a
.key()),
4055 datastore_types
.Key
._FromPb
(b
.key()))
4058 def _GuessOrders(filters
, orders
):
4059 """Guess any implicit ordering.
4061 The datastore gives a logical, but not necessarily predictable, ordering when
4062 orders are not completely explicit. This function guesses at that ordering
4063 (which is better then always ordering by __key__ for tests).
4066 filters: The datastore_pb.Query_Filter that have already been normalized and
4068 orders: The datastore_pb.Query_Order that have already been normalized and
4069 checked. Mutated in place.
4075 for filter_pb
in filters
:
4076 if filter_pb
.op() in datastore_index
.INEQUALITY_OPERATORS
:
4078 order
= datastore_pb
.Query_Order()
4079 order
.set_property(filter_pb
.property(0).name())
4080 orders
.append(order
)
4084 exists_props
= (filter_pb
.property(0).name() for filter_pb
in filters
4085 if filter_pb
.op() == datastore_pb
.Query_Filter
.EXISTS
)
4086 for prop
in sorted(exists_props
):
4087 order
= datastore_pb
.Query_Order()
4088 order
.set_property(prop
)
4089 orders
.append(order
)
4092 if not orders
or orders
[-1].property() != '__key__':
4093 order
= datastore_pb
.Query_Order()
4094 order
.set_property('__key__')
4095 orders
.append(order
)
4099 def _MakeQuery(query_pb
, filters
, orders
, filter_predicate
):
4100 """Make a datastore_query.Query for the given datastore_pb.Query.
4102 Overrides filters and orders in query with the specified arguments.
4105 query_pb: a datastore_pb.Query.
4106 filters: the filters from query.
4107 orders: the orders from query.
4108 filter_predicate: an additional filter of type
4109 datastore_query.FilterPredicate. This is passed along to implement V4
4110 specific filters without changing the entire stub.
4113 A datastore_query.Query for the datastore_pb.Query."""
4119 clone_pb
= datastore_pb
.Query()
4120 clone_pb
.CopyFrom(query_pb
)
4121 clone_pb
.clear_filter()
4122 clone_pb
.clear_order()
4123 clone_pb
.filter_list().extend(filters
)
4124 clone_pb
.order_list().extend(orders
)
4126 query
= datastore_query
.Query
._from
_pb
(clone_pb
)
4128 assert datastore_v4_pb
.CompositeFilter
._Operator
_NAMES
.values() == ['AND']
4133 if filter_predicate
is not None:
4134 if query
.filter_predicate
is not None:
4137 filter_predicate
= datastore_query
.CompositeFilter(
4138 datastore_query
.CompositeFilter
.AND
,
4139 [filter_predicate
, query
.filter_predicate
])
4141 return datastore_query
.Query(app
=query
.app
,
4142 namespace
=query
.namespace
,
4143 ancestor
=query
.ancestor
,
4144 filter_predicate
=filter_predicate
,
4145 group_by
=query
.group_by
,
4150 def _CreateIndexEntities(entity
, postfix_props
):
4151 """Creates entities for index values that would appear in prodcution.
4153 This function finds all multi-valued properties listed in split_props, and
4154 creates a new entity for each unique combination of values. The resulting
4155 entities will only have a single value for each property listed in
4158 It reserves the right to include index data that would not be
4159 seen in production, e.g. by returning the original entity when no splitting
4160 is needed. LoadEntity will remove any excess fields.
4162 This simulates the results seen by an index scan in the datastore.
4165 entity: The entity_pb.EntityProto to split.
4166 split_props: A set of property names to split on.
4169 A list of the split entity_pb.EntityProtos.
4172 split_required
= False
4174 for prop
in entity
.property_list():
4175 if prop
.name() in postfix_props
:
4176 values
= to_split
.get(prop
.name())
4179 to_split
[prop
.name()] = values
4182 split_required
= True
4183 if prop
.value() not in values
:
4184 values
.append(prop
.value())
4186 base_props
.append(prop
)
4188 if not split_required
:
4192 clone
= entity_pb
.EntityProto()
4193 clone
.CopyFrom(entity
)
4194 clone
.clear_property()
4195 clone
.property_list().extend(base_props
)
4198 for name
, splits
in to_split
.iteritems():
4199 if len(splits
) == 1:
4201 for result
in results
:
4202 prop
= result
.add_property()
4204 prop
.set_multiple(False)
4205 prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
4206 prop
.mutable_value().CopyFrom(splits
[0])
4210 for result
in results
:
4211 for split
in splits
:
4212 clone
= entity_pb
.EntityProto()
4213 clone
.CopyFrom(result
)
4214 prop
= clone
.add_property()
4216 prop
.set_multiple(False)
4217 prop
.set_meaning(entity_pb
.Property
.INDEX_VALUE
)
4218 prop
.mutable_value().CopyFrom(split
)
4219 new_results
.append(clone
)
4220 results
= new_results
4224 def _CreateIndexOnlyQueryResults(results
, postfix_props
):
4225 """Creates a result set similar to that returned by an index only query."""
4227 for result
in results
:
4228 new_results
.extend(_CreateIndexEntities(result
, postfix_props
))
4232 def _ExecuteQuery(results
, query
, filters
, orders
, index_list
,
4233 filter_predicate
=None):
4234 """Executes the query on a superset of its results.
4237 results: superset of results for query.
4238 query: a datastore_pb.Query.
4239 filters: the filters from query.
4240 orders: the orders from query.
4241 index_list: the list of indexes used by the query.
4242 filter_predicate: an additional filter of type
4243 datastore_query.FilterPredicate. This is passed along to implement V4
4244 specific filters without changing the entire stub.
4247 A ListCursor over the results of applying query to results.
4249 orders
= _GuessOrders(filters
, orders
)
4250 dsquery
= _MakeQuery(query
, filters
, orders
, filter_predicate
)
4252 if query
.property_name_size():
4253 results
= _CreateIndexOnlyQueryResults(
4254 results
, set(order
.property() for order
in orders
))
4256 return ListCursor(query
, dsquery
, orders
, index_list
,
4257 datastore_query
.apply_query(dsquery
, results
))
4260 def _UpdateCost(cost
, entity_writes
, index_writes
):
4261 """Updates the provided cost.
4264 cost: Out param. The cost object to update.
4265 entity_writes: The number of entity writes to add.
4266 index_writes: The number of index writes to add.
4268 cost
.set_entity_writes(cost
.entity_writes() + entity_writes
)
4269 cost
.set_index_writes(cost
.index_writes() + index_writes
)
4272 def _CalculateWriteOps(composite_indexes
, old_entity
, new_entity
):
4273 """Determines number of entity and index writes needed to write new_entity.
4275 We assume that old_entity represents the current state of the Datastore.
4278 composite_indexes: The composite_indexes for the kind of the entities.
4279 old_entity: Entity representing the current state in the Datstore.
4280 new_entity: Entity representing the desired state in the Datstore.
4283 A tuple of size 2, where the first value is the number of entity writes and
4284 the second value is the number of index writes.
4286 if (old_entity
is not None and
4287 old_entity
.property_list() == new_entity
.property_list()
4288 and old_entity
.raw_property_list() == new_entity
.raw_property_list()):
4291 index_writes
= _ChangedIndexRows(composite_indexes
, old_entity
, new_entity
)
4292 if old_entity
is None:
4298 return 1, index_writes
4301 def _ChangedIndexRows(composite_indexes
, old_entity
, new_entity
):
4302 """Determine the number of index rows that need to change.
4304 We assume that old_entity represents the current state of the Datastore.
4307 composite_indexes: The composite_indexes for the kind of the entities.
4308 old_entity: Entity representing the current state in the Datastore.
4309 new_entity: Entity representing the desired state in the Datastore
4312 The number of index rows that need to change.
4317 unique_old_properties
= collections
.defaultdict(set)
4322 unique_new_properties
= collections
.defaultdict(set)
4324 if old_entity
is not None:
4325 for old_prop
in old_entity
.property_list():
4326 _PopulateUniquePropertiesSet(old_prop
, unique_old_properties
)
4329 unchanged
= collections
.defaultdict(int)
4331 for new_prop
in new_entity
.property_list():
4332 new_prop_as_str
= _PopulateUniquePropertiesSet(
4333 new_prop
, unique_new_properties
)
4334 if new_prop_as_str
in unique_old_properties
[new_prop
.name()]:
4335 unchanged
[new_prop
.name()] += 1
4340 all_property_names
= set(unique_old_properties
.iterkeys())
4341 all_property_names
.update(unique_old_properties
.iterkeys())
4342 all_property_names
.update(unchanged
.iterkeys())
4344 all_indexes
= _GetEntityByPropertyIndexes(all_property_names
)
4345 all_indexes
.extend([comp
.definition() for comp
in composite_indexes
])
4346 path_size
= new_entity
.key().path().element_size()
4348 for index
in all_indexes
:
4352 ancestor_multiplier
= 1
4353 if index
.ancestor() and index
.property_size() > 1:
4354 ancestor_multiplier
= path_size
4355 writes
+= (_CalculateWritesForCompositeIndex(
4356 index
, unique_old_properties
, unique_new_properties
, unchanged
) *
4357 ancestor_multiplier
)
4361 def _PopulateUniquePropertiesSet(prop
, unique_properties
):
4362 """Populates a set containing unique properties.
4365 prop: An entity property.
4366 unique_properties: Dictionary mapping property names to a set of unique
4370 The property pb in string (hashable) form.
4373 prop
= _CopyAndSetMultipleToFalse(prop
)
4376 prop_as_str
= prop
.SerializePartialToString()
4377 unique_properties
[prop
.name()].add(prop_as_str
)
4381 def _CalculateWritesForCompositeIndex(index
, unique_old_properties
,
4382 unique_new_properties
,
4384 """Calculate the number of writes required to maintain a specific Index.
4387 index: The composite index.
4388 unique_old_properties: Dictionary mapping property names to a set of props
4389 present on the old entity.
4390 unique_new_properties: Dictionary mapping property names to a set of props
4391 present on the new entity.
4392 common_properties: Dictionary mapping property names to the number of
4393 properties with that name that are present on both the old and new
4397 The number of writes required to maintained the provided index.
4402 for prop
in index
.property_list():
4403 old_count
*= len(unique_old_properties
[prop
.name()])
4404 new_count
*= len(unique_new_properties
[prop
.name()])
4405 common_count
*= common_properties
[prop
.name()]
4407 return (old_count
- common_count
) + (new_count
- common_count
)
4410 def _GetEntityByPropertyIndexes(all_property_names
):
4412 for prop_name
in all_property_names
:
4414 _SinglePropertyIndex(prop_name
, entity_pb
.Index_Property
.ASCENDING
))
4416 _SinglePropertyIndex(prop_name
, entity_pb
.Index_Property
.DESCENDING
))
4420 def _SinglePropertyIndex(prop_name
, direction
):
4421 """Creates a single property Index for the given name and direction.
4424 prop_name: The name of the single property on the Index.
4425 direction: The direction of the Index.
4428 A single property Index with the given property and direction.
4430 index
= entity_pb
.Index()
4431 prop
= index
.add_property()
4432 prop
.set_name(prop_name
)
4433 prop
.set_direction(direction
)
4437 def _CopyAndSetMultipleToFalse(prop
):
4438 """Copy the provided Property and set its "multiple" attribute to False.
4441 prop: The Property to copy.
4444 A copy of the given Property with its "multiple" attribute set to False.
4451 prop_copy
= entity_pb
.Property()
4452 prop_copy
.MergeFrom(prop
)
4453 prop_copy
.set_multiple(False)
4457 def _SetStartInclusive(position
, first_direction
):
4458 """Sets the start_inclusive field in position.
4461 position: datastore_pb.Position
4462 first_direction: the first sort order from the query
4463 (a datastore_pb.Query_Order) or None
4465 position
.set_start_inclusive(
4466 position
.before_ascending()
4467 != (first_direction
== datastore_pb
.Query_Order
.DESCENDING
))
4470 def _SetBeforeAscending(position
, first_direction
):
4471 """Sets the before_ascending field in position.
4474 position: datastore_pb.Position
4475 first_direction: the first sort order from the query
4476 (a datastore_pb.Query_Order) or None
4478 position
.set_before_ascending(
4479 position
.start_inclusive()
4480 != (first_direction
== datastore_pb
.Query_Order
.DESCENDING
))