App Engine Python SDK version 1.8.1
[gae.git] / python / google / appengine / datastore / datastore_rpc.py
blob7248b546c8cf2dafb7f60da01652242f6f8b1caa
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Asynchronous datastore API.
23 This is designed to be the lowest-level API to be used by all Python
24 datastore client libraries.
26 A refactoring is in progress to rebuild datastore.py on top of this,
27 while remaining nearly 100% backwards compatible. A new (not intended
28 to be compatible) library to replace db.py is also under development.
29 """
38 __all__ = ['AbstractAdapter',
39 'BaseConfiguration',
40 'BaseConnection',
41 'ConfigOption',
42 'Configuration',
43 'Connection',
44 'IdentityAdapter',
45 'MultiRpc',
46 'TransactionalConnection',
47 'TransactionOptions',
53 import collections
54 import functools
55 import logging
58 from google.appengine.datastore import entity_pb
61 from google.appengine.api import api_base_pb
62 from google.appengine.api import apiproxy_rpc
63 from google.appengine.api import apiproxy_stub_map
65 from google.appengine.api import datastore_errors
66 from google.appengine.api import datastore_types
68 from google.appengine.api.app_identity import app_identity
69 from google.appengine.datastore import datastore_pb
70 from google.appengine.datastore import datastore_v4a_pb
71 from google.appengine.datastore import entity_v4_pb
72 from google.appengine.runtime import apiproxy_errors
78 _MAX_ID_BATCH_SIZE = 1000 * 1000 * 1000
82 def _positional(max_pos_args):
83 """A decorator to declare that only the first N arguments may be positional.
85 Note that for methods, n includes 'self'.
86 """
87 def positional_decorator(wrapped):
88 @functools.wraps(wrapped)
89 def positional_wrapper(*args, **kwds):
90 if len(args) > max_pos_args:
91 plural_s = ''
92 if max_pos_args != 1:
93 plural_s = 's'
94 raise TypeError(
95 '%s() takes at most %d positional argument%s (%d given)' %
96 (wrapped.__name__, max_pos_args, plural_s, len(args)))
97 return wrapped(*args, **kwds)
98 return positional_wrapper
99 return positional_decorator
102 def _GetDatastoreType(app=None):
103 """Tries to get the datastore type for the given app.
105 This function is only guaranteed to return something other than
106 UNKNOWN_DATASTORE when running in production and querying the current app.
108 current_app = datastore_types.ResolveAppId(None)
109 if app not in (current_app, None):
110 return BaseConnection.UNKNOWN_DATASTORE
115 partition, _, _ = app_identity._ParseFullAppId(current_app)
116 if partition:
117 return BaseConnection.HIGH_REPLICATION_DATASTORE
118 return BaseConnection.MASTER_SLAVE_DATASTORE
121 class AbstractAdapter(object):
122 """Abstract interface between protobufs and user-level classes.
124 This class defines conversions between the protobuf classes defined
125 in entity_pb.py on the one hand, and the corresponding user-level
126 classes (which are defined by higher-level API libraries such as
127 datastore.py or db.py) on the other hand.
129 The premise is that the code in this module is agnostic about the
130 user-level classes used to represent keys and entities, while at the
131 same time provinging APIs that accept or return such user-level
132 classes.
134 Higher-level libraries must subclass this abstract class and pass an
135 instance of the subclass to the Connection they want to use.
137 These methods may raise datastore_errors.Error for bad inputs.
140 def pb_to_key(self, pb):
141 """Turn an entity_pb.Reference into a user-level key."""
142 raise NotImplementedError
144 def pb_to_entity(self, pb):
145 """Turn an entity_pb.EntityProto into a user-level entity."""
146 raise NotImplementedError
148 def pb_to_index(self, pb):
149 """Turn an entity_pb.CompositeIndex into a user-level Index
150 representation."""
151 raise NotImplementedError
153 def pb_to_query_result(self, pb, query_options):
154 """Turn an entity_pb.EntityProto into a user-level query result."""
155 if query_options.keys_only:
156 return self.pb_to_key(pb.key())
157 else:
158 return self.pb_to_entity(pb)
160 def key_to_pb(self, key):
161 """Turn a user-level key into an entity_pb.Reference."""
162 raise NotImplementedError
164 def entity_to_pb(self, entity):
165 """Turn a user-level entity into an entity_pb.EntityProto."""
166 raise NotImplementedError
168 def new_key_pb(self):
169 """Create a new, empty entity_pb.Reference."""
170 return entity_pb.Reference()
172 def new_entity_pb(self):
173 """Create a new, empty entity_pb.EntityProto."""
174 return entity_pb.EntityProto()
177 class IdentityAdapter(AbstractAdapter):
178 """A concrete adapter that implements the identity mapping.
180 This is used as the default when a Connection is created without
181 specifying an adapter; that's primarily for testing.
184 def pb_to_key(self, pb):
185 return pb
187 def pb_to_entity(self, pb):
188 return pb
190 def key_to_pb(self, key):
191 return key
193 def entity_to_pb(self, entity):
194 return entity
196 def pb_to_index(self, pb):
197 return pb
200 class ConfigOption(object):
201 """A descriptor for a Configuration option.
203 This class is used to create a configuration option on a class that inherits
204 from BaseConfiguration. A validator function decorated with this class will
205 be converted to a read-only descriptor and BaseConfiguration will implement
206 constructor and merging logic for that configuration option. A validator
207 function takes a single non-None value to validate and either throws
208 an exception or returns that value (or an equivalent value). A validator is
209 called once at construction time, but only if a non-None value for the
210 configuration option is specified the constructor's keyword arguments.
213 def __init__(self, validator):
214 self.validator = validator
216 def __get__(self, obj, objtype):
217 if obj is None:
218 return self
219 return obj._values.get(self.validator.__name__, None)
221 def __set__(self, obj, value):
222 raise AttributeError('Configuration options are immutable (%s)' %
223 (self.validator.__name__,))
225 def __call__(self, *args):
226 """Gets the first non-None value for this option from the given args.
228 Args:
229 *arg: Any number of configuration objects or None values.
231 Returns:
232 The first value for this ConfigOption found in the given configuration
233 objects or None.
235 Raises:
236 datastore_errors.BadArgumentError if a given in object is not a
237 configuration object.
239 name = self.validator.__name__
240 for config in args:
242 if isinstance(config, (type(None), apiproxy_stub_map.UserRPC)):
243 pass
244 elif not isinstance(config, BaseConfiguration):
245 raise datastore_errors.BadArgumentError(
246 'invalid config argument (%r)' % (config,))
247 elif name in config._values and self is config._options[name]:
248 return config._values[name]
249 return None
252 class _ConfigurationMetaClass(type):
253 """The metaclass for all Configuration types.
255 This class is needed to store a class specific list of all ConfigOptions in
256 cls._options, and insert a __slots__ variable into the class dict before the
257 class is created to impose immutability.
260 def __new__(metaclass, classname, bases, classDict):
261 if classname == '_MergedConfiguration':
263 return type.__new__(metaclass, classname, bases, classDict)
267 if object in bases:
268 classDict['__slots__'] = ['_values']
269 else:
270 classDict['__slots__'] = []
271 cls = type.__new__(metaclass, classname, bases, classDict)
272 if object not in bases:
273 options = {}
274 for c in reversed(cls.__mro__):
275 if '_options' in c.__dict__:
276 options.update(c.__dict__['_options'])
277 cls._options = options
278 for option, value in cls.__dict__.iteritems():
279 if isinstance(value, ConfigOption):
280 if cls._options.has_key(option):
281 raise TypeError('%s cannot be overridden (%s)' %
282 (option, cls.__name__))
283 cls._options[option] = value
284 value._cls = cls
285 return cls
290 class BaseConfiguration(object):
291 """A base class for a configuration object.
293 Subclasses should provide validation functions for every configuration option
294 they accept. Any public function decorated with ConfigOption is assumed to be
295 a validation function for an option of the same name. All validation functions
296 take a single non-None value to validate and must throw an exception or return
297 the value to store.
299 This class forces subclasses to be immutable and exposes a read-only
300 property for every accepted configuration option. Configuration options set by
301 passing keyword arguments to the constructor. The constructor and merge
302 function are designed to avoid creating redundant copies and may return
303 the configuration objects passed to them if appropriate.
305 Setting an option to None is the same as not specifying the option except in
306 the case where the 'config' argument is given. In this case the value on
307 'config' of the same name is ignored. Options that are not specified will
308 return 'None' when accessed.
311 __metaclass__ = _ConfigurationMetaClass
312 _options = {}
314 def __new__(cls, config=None, **kwargs):
315 """Immutable constructor.
317 If 'config' is non-None all configuration options will default to the value
318 it contains unless the configuration option is explicitly set to 'None' in
319 the keyword arguments. If 'config' is None then all configuration options
320 default to None.
322 Args:
323 config: Optional base configuration providing default values for
324 parameters not specified in the keyword arguments.
325 **kwargs: Configuration options to store on this object.
327 Returns:
328 Either a new Configuration object or (if it would be equivalent)
329 the config argument unchanged, but never None.
331 if config is None:
332 pass
333 elif isinstance(config, BaseConfiguration):
334 if cls is config.__class__ and config.__is_stronger(**kwargs):
336 return config
338 for key, value in config._values.iteritems():
340 if issubclass(cls, config._options[key]._cls):
341 kwargs.setdefault(key, value)
342 else:
343 raise datastore_errors.BadArgumentError(
344 'config argument should be Configuration (%r)' % (config,))
346 obj = super(BaseConfiguration, cls).__new__(cls)
347 obj._values = {}
348 for key, value in kwargs.iteritems():
349 if value is not None:
350 try:
351 config_option = obj._options[key]
352 except KeyError, err:
353 raise TypeError('Unknown configuration option (%s)' % err)
354 value = config_option.validator(value)
355 if value is not None:
356 obj._values[key] = value
357 return obj
359 def __eq__(self, other):
360 if self is other:
361 return True
362 if not isinstance(other, BaseConfiguration):
363 return NotImplemented
364 return self._options == other._options and self._values == other._values
366 def __ne__(self, other):
367 equal = self.__eq__(other)
368 if equal is NotImplemented:
369 return equal
370 return not equal
372 def __hash__(self):
373 return (hash(frozenset(self._values.iteritems())) ^
374 hash(frozenset(self._options.iteritems())))
376 def __repr__(self):
377 args = []
378 for key_value in sorted(self._values.iteritems()):
379 args.append('%s=%r' % key_value)
380 return '%s(%s)' % (self.__class__.__name__, ', '.join(args))
382 def __is_stronger(self, **kwargs):
383 """Internal helper to ask whether a configuration is stronger than another.
385 A configuration is stronger when it contains every name/value pair in
386 kwargs.
388 Example: a configuration with:
389 (deadline=5, on_configuration=None, read_policy=EVENTUAL_CONSISTENCY)
390 is stronger than:
391 (deadline=5, on_configuration=None)
392 but not stronger than:
393 (deadline=5, on_configuration=None, read_policy=None)
395 (deadline=10, on_configuration=None, read_policy=None).
397 More formally:
398 - Any value is stronger than an unset value;
399 - Any value is stronger than itself.
401 Returns:
402 True if each of the self attributes is stronger than the
403 corresponding argument.
405 for key, value in kwargs.iteritems():
406 if key not in self._values or value != self._values[key]:
407 return False
408 return True
410 @classmethod
411 def is_configuration(cls, obj):
412 """True if configuration obj handles all options of this class.
414 Use this method rather than isinstance(obj, cls) to test if a
415 configuration object handles the options of cls (is_configuration
416 is handled specially for results of merge which may handle the options
417 of unrelated configuration classes).
419 Args:
420 obj: the object to test.
422 return isinstance(obj, BaseConfiguration) and obj._is_configuration(cls)
424 def _is_configuration(self, cls):
425 return isinstance(self, cls)
427 def merge(self, config):
428 """Merge two configurations.
430 The configuration given as an argument (if any) takes priority;
431 defaults are filled in from the current configuration.
433 Args:
434 config: Configuration providing overrides, or None (but cannot
435 be omitted).
437 Returns:
438 Either a new configuration object or (if it would be equivalent)
439 self or the config argument unchanged, but never None.
441 Raises:
442 BadArgumentError if self or config are of configurations classes
443 with conflicting options (i.e. the same option name defined in
444 two different configuration classes).
446 if config is None or config is self:
448 return self
452 if not (isinstance(config, _MergedConfiguration) or
453 isinstance(self, _MergedConfiguration)):
457 if isinstance(config, self.__class__):
458 for key in self._values:
459 if key not in config._values:
460 break
461 else:
462 return config
463 if isinstance(self, config.__class__):
464 if self.__is_stronger(**config._values):
465 return self
468 def _quick_merge(obj):
469 obj._values = self._values.copy()
470 obj._values.update(config._values)
471 return obj
473 if isinstance(config, self.__class__):
474 return _quick_merge(type(config)())
475 if isinstance(self, config.__class__):
476 return _quick_merge(type(self)())
479 return _MergedConfiguration(config, self)
481 def __getstate__(self):
482 return {'_values': self._values}
484 def __setstate__(self, state):
487 obj = self.__class__(**state['_values'])
488 self._values = obj._values
491 class _MergedConfiguration(BaseConfiguration):
492 """Helper class to handle merges of configurations.
494 Instances of _MergedConfiguration are in some sense "subclasses" of the
495 argument configurations, i.e.:
496 - they handle exactly the configuration options of the argument configurations
497 - the value of these options is taken in priority order from the arguments
498 - isinstance is true on this configuration if it is true on any of the
499 argument configurations
500 This class raises an exception if two argument configurations have an option
501 with the same name but coming from a different configuration class.
503 __slots__ = ['_values', '_configs', '_options', '_classes']
505 def __new__(cls, *configs):
506 obj = super(BaseConfiguration, cls).__new__(cls)
507 obj._configs = configs
510 obj._options = {}
511 for config in configs:
512 for name, option in config._options.iteritems():
513 if name in obj._options:
514 if option is not obj._options[name]:
515 error = ("merge conflict on '%s' from '%s' and '%s'" %
516 (name, option._cls.__name__,
517 obj._options[name]._cls.__name__))
518 raise datastore_errors.BadArgumentError(error)
519 obj._options[name] = option
521 obj._values = {}
522 for config in reversed(configs):
523 for name, value in config._values.iteritems():
524 obj._values[name] = value
526 return obj
528 def __repr__(self):
529 return '%s%r' % (self.__class__.__name__, tuple(self._configs))
531 def _is_configuration(self, cls):
532 for config in self._configs:
533 if config._is_configuration(cls):
534 return True
535 return False
537 def __getattr__(self, name):
538 if name in self._options:
539 if name in self._values:
540 return self._values[name]
541 else:
542 return None
543 raise AttributeError("Configuration has no attribute '%s'" % (name,))
545 def __getstate__(self):
546 return {'_configs': self._configs}
548 def __setstate__(self, state):
550 obj = _MergedConfiguration(*state['_configs'])
551 self._values = obj._values
552 self._configs = obj._configs
553 self._options = obj._options
556 class Configuration(BaseConfiguration):
557 """Configuration parameters for datastore RPCs.
559 This class reserves the right to define configuration options of any name
560 except those that start with 'user_'. External subclasses should only define
561 function or variables with names that start with in 'user_'.
563 The options defined on this class include generic RPC parameters (deadline)
564 but also datastore-specific parameters (on_completion and read_policy).
566 Options are set by passing keyword arguments to the constructor corresponding
567 to the configuration options defined below.
571 STRONG_CONSISTENCY = 0
572 """A read consistency that will return up to date results."""
574 EVENTUAL_CONSISTENCY = 1
575 """A read consistency that allows requests to return possibly stale results.
577 This read_policy tends to be faster and less prone to unavailability/timeouts.
578 May return transactionally inconsistent results in rare cases.
581 APPLY_ALL_JOBS_CONSISTENCY = 2
582 """A read consistency that aggressively tries to find write jobs to apply.
584 Use of this read policy is strongly discouraged.
586 This read_policy tends to be more costly and is only useful in a few specific
587 cases. It is equivalent to splitting a request by entity group and wrapping
588 each batch in a separate transaction. Cannot be used with non-ancestor
589 queries.
593 ALL_READ_POLICIES = frozenset((STRONG_CONSISTENCY,
594 EVENTUAL_CONSISTENCY,
595 APPLY_ALL_JOBS_CONSISTENCY,
600 @ConfigOption
601 def deadline(value):
602 """The deadline for any RPC issued.
604 If unset the system default will be used which is typically 5 seconds.
606 Raises:
607 BadArgumentError if value is not a number or is less than zero.
609 if not isinstance(value, (int, long, float)):
610 raise datastore_errors.BadArgumentError(
611 'deadline argument should be int/long/float (%r)' % (value,))
612 if value <= 0:
613 raise datastore_errors.BadArgumentError(
614 'deadline argument should be > 0 (%r)' % (value,))
615 return value
617 @ConfigOption
618 def on_completion(value):
619 """A callback that is invoked when any RPC completes.
621 If specified, it will be called with a UserRPC object as argument when an
622 RPC completes.
624 NOTE: There is a subtle but important difference between
625 UserRPC.callback and Configuration.on_completion: on_completion is
626 called with the RPC object as its first argument, where callback is
627 called without arguments. (Because a Configuration's on_completion
628 function can be used with many UserRPC objects, it would be awkward
629 if it was called without passing the specific RPC.)
633 return value
635 @ConfigOption
636 def read_policy(value):
637 """The read policy to use for any relevent RPC.
639 if unset STRONG_CONSISTENCY will be used.
641 Raises:
642 BadArgumentError if value is not a known read policy.
644 if value not in Configuration.ALL_READ_POLICIES:
645 raise datastore_errors.BadArgumentError(
646 'read_policy argument invalid (%r)' % (value,))
647 return value
649 @ConfigOption
650 def force_writes(value):
651 """If a write request should succeed even if the app is read-only.
653 This only applies to user controlled read-only periods.
655 if not isinstance(value, bool):
656 raise datastore_errors.BadArgumentError(
657 'force_writes argument invalid (%r)' % (value,))
658 return value
660 @ConfigOption
661 def max_entity_groups_per_rpc(value):
662 """The maximum number of entity groups that can be represented in one rpc.
664 For a non-transactional operation that involves more entity groups than the
665 maximum, the operation will be performed by executing multiple, asynchronous
666 rpcs to the datastore, each of which has no more entity groups represented
667 than the maximum. So, if a put() operation has 8 entity groups and the
668 maximum is 3, we will send 3 rpcs, 2 with 3 entity groups and 1 with 2
669 entity groups. This is a performance optimization - in many cases
670 multiple, small, concurrent rpcs will finish faster than a single large
671 rpc. The optimal value for this property will be application-specific, so
672 experimentation is encouraged.
674 if not (isinstance(value, (int, long)) and value > 0):
675 raise datastore_errors.BadArgumentError(
676 'max_entity_groups_per_rpc should be a positive integer')
677 return value
679 @ConfigOption
680 def max_allocate_ids_keys(value):
681 """The maximum number of keys in a v4 AllocateIds rpc."""
682 if not (isinstance(value, (int, long)) and value > 0):
683 raise datastore_errors.BadArgumentError(
684 'max_allocate_ids_keys should be a positive integer')
685 return value
687 @ConfigOption
688 def max_rpc_bytes(value):
689 """The maximum serialized size of a Get/Put/Delete without batching."""
690 if not (isinstance(value, (int, long)) and value > 0):
691 raise datastore_errors.BadArgumentError(
692 'max_rpc_bytes should be a positive integer')
693 return value
695 @ConfigOption
696 def max_get_keys(value):
697 """The maximum number of keys in a Get without batching."""
698 if not (isinstance(value, (int, long)) and value > 0):
699 raise datastore_errors.BadArgumentError(
700 'max_get_keys should be a positive integer')
701 return value
703 @ConfigOption
704 def max_put_entities(value):
705 """The maximum number of entities in a Put without batching."""
706 if not (isinstance(value, (int, long)) and value > 0):
707 raise datastore_errors.BadArgumentError(
708 'max_put_entities should be a positive integer')
709 return value
711 @ConfigOption
712 def max_delete_keys(value):
713 """The maximum number of keys in a Delete without batching."""
714 if not (isinstance(value, (int, long)) and value > 0):
715 raise datastore_errors.BadArgumentError(
716 'max_delete_keys should be a positive integer')
717 return value
719 class MultiRpc(object):
720 """A wrapper around multiple UserRPC objects.
722 This provides an API similar to that of UserRPC, but wraps multiple
723 RPCs such that e.g. .wait() blocks until all wrapped RPCs are
724 complete, and .get_result() returns the combined results from all
725 wrapped RPCs.
727 Class methods:
728 flatten(rpcs): Expand a list of UserRPCs and MultiRpcs
729 into a list of UserRPCs.
730 wait_any(rpcs): Call UserRPC.wait_any(flatten(rpcs)).
731 wait_all(rpcs): Call UserRPC.wait_all(flatten(rpcs)).
733 Instance methods:
734 wait(): Wait for all RPCs.
735 check_success(): Wait and then check success for all RPCs.
736 get_result(): Wait for all, check successes, then merge
737 all results.
739 Instance attributes:
740 rpcs: The list of wrapped RPCs (returns a copy).
741 state: The combined state of all RPCs.
744 def __init__(self, rpcs, extra_hook=None):
745 """Constructor.
747 Args:
748 rpcs: A list of UserRPC and MultiRpc objects; it is flattened
749 before being stored.
750 extra_hook: Optional function to be applied to the final result
751 or list of results.
753 self.__rpcs = self.flatten(rpcs)
754 self.__extra_hook = extra_hook
756 @property
757 def rpcs(self):
758 """Get a flattened list containing the RPCs wrapped.
760 This returns a copy to prevent users from modifying the state.
762 return list(self.__rpcs)
764 @property
765 def state(self):
766 """Get the combined state of the wrapped RPCs.
768 This mimics the UserRPC.state property. If all wrapped RPCs have
769 the same state, that state is returned; otherwise, RUNNING is
770 returned (which here really means 'neither fish nor flesh').
772 lo = apiproxy_rpc.RPC.FINISHING
773 hi = apiproxy_rpc.RPC.IDLE
774 for rpc in self.__rpcs:
775 lo = min(lo, rpc.state)
776 hi = max(hi, rpc.state)
777 if lo == hi:
778 return lo
779 return apiproxy_rpc.RPC.RUNNING
781 def wait(self):
782 """Wait for all wrapped RPCs to finish.
784 This mimics the UserRPC.wait() method.
786 apiproxy_stub_map.UserRPC.wait_all(self.__rpcs)
788 def check_success(self):
789 """Check success of all wrapped RPCs, failing if any of the failed.
791 This mimics the UserRPC.check_success() method.
793 NOTE: This first waits for all wrapped RPCs to finish before
794 checking the success of any of them. This makes debugging easier.
796 self.wait()
797 for rpc in self.__rpcs:
798 rpc.check_success()
800 def get_result(self):
801 """Return the combined results of all wrapped RPCs.
803 This mimics the UserRPC.get_results() method. Multiple results
804 are combined using the following rules:
806 1. If there are no wrapped RPCs, an empty list is returned.
808 2. If exactly one RPC is wrapped, its result is returned.
810 3. If more than one RPC is wrapped, the result is always a list,
811 which is constructed from the wrapped results as follows:
813 a. A wrapped result equal to None is ignored;
815 b. A wrapped result that is a list (but not any other type of
816 sequence!) has its elements added to the result list.
818 c. Any other wrapped result is appended to the result list.
820 After all results are combined, if __extra_hook is set, it is
821 called with the combined results and its return value becomes the
822 final result.
824 NOTE: This first waits for all wrapped RPCs to finish, and then
825 checks all their success. This makes debugging easier.
837 if len(self.__rpcs) == 1:
838 results = self.__rpcs[0].get_result()
839 else:
840 results = []
843 for rpc in self.__rpcs:
844 result = rpc.get_result()
845 if isinstance(result, list):
846 results.extend(result)
847 elif result is not None:
848 results.append(result)
849 if self.__extra_hook is not None:
850 results = self.__extra_hook(results)
851 return results
853 @classmethod
854 def flatten(cls, rpcs):
855 """Return a list of UserRPCs, expanding MultiRpcs in the argument list.
857 For example: given 4 UserRPCs rpc1 through rpc4,
858 flatten(rpc1, MultiRpc([rpc2, rpc3], rpc4)
859 returns [rpc1, rpc2, rpc3, rpc4].
861 Args:
862 rpcs: A list of UserRPC and MultiRpc objects.
864 Returns:
865 A list of UserRPC objects.
867 flat = []
868 for rpc in rpcs:
869 if isinstance(rpc, MultiRpc):
873 flat.extend(rpc.__rpcs)
874 else:
875 if not isinstance(rpc, apiproxy_stub_map.UserRPC):
876 raise datastore_errors.BadArgumentError(
877 'Expected a list of UserRPC object (%r)' % (rpc,))
878 flat.append(rpc)
879 return flat
881 @classmethod
882 def wait_any(cls, rpcs):
883 """Wait until one of the RPCs passed in is finished.
885 This mimics UserRPC.wait_any().
887 Args:
888 rpcs: A list of UserRPC and MultiRpc objects.
890 Returns:
891 A UserRPC object or None.
893 return apiproxy_stub_map.UserRPC.wait_any(cls.flatten(rpcs))
895 @classmethod
896 def wait_all(cls, rpcs):
897 """Wait until all RPCs passed in are finished.
899 This mimics UserRPC.wait_all().
901 Args:
902 rpcs: A list of UserRPC and MultiRpc objects.
904 apiproxy_stub_map.UserRPC.wait_all(cls.flatten(rpcs))
907 class BaseConnection(object):
908 """Datastore connection base class.
910 NOTE: Do not instantiate this class; use Connection or
911 TransactionalConnection instead.
913 This is not a traditional database connection -- with App Engine, in
914 the end the connection is always implicit in the process state.
915 There is also no intent to be compatible with PEP 249 (Python's
916 Database-API). But it is a useful abstraction to have an explicit
917 object that manages the database interaction, and especially
918 transactions. Other settings related to the App Engine datastore
919 are also stored here (e.g. the RPC timeout).
921 A similar class in the Java API to the App Engine datastore is
922 DatastoreServiceConfig (but in Java, transaction state is always
923 held by the current thread).
925 To use transactions, call connection.new_transaction(). This
926 returns a new connection (an instance of the TransactionalConnection
927 subclass) which you should use for all operations in the
928 transaction.
930 This model supports multiple unrelated concurrent transactions (but
931 not nested transactions as this concept is commonly understood in
932 the relational database world).
934 When the transaction is done, call .commit() or .rollback() on the
935 transactional connection. If .commit() returns False, the
936 transaction failed and none of your operations made it to the
937 datastore; if it returns True, all your operations were committed.
938 The transactional connection cannot be used once .commit() or
939 .rollback() is called.
941 Transactions are created lazily. The first operation that requires
942 a transaction handle will issue the low-level BeginTransaction
943 request and wait for it to return.
945 Transactions keep track of the entity group. All operations within
946 a transaction must use the same entity group. An entity group
947 (currently) comprises an app id, a namespace, and a top-level key (a
948 kind and an id or name). The first operation performed determines
949 the entity group. There is some special-casing when the first
950 operation is a put() of an entity with an incomplete key; in this case
951 the entity group is determined after the operation returns.
953 NOTE: the datastore stubs in the dev_appserver currently support
954 only a single concurrent transaction. Specifically, the (old) file
955 stub locks up if an attempt is made to start a new transaction while
956 a transaction is already in use, whereas the sqlite stub fails an
957 assertion.
960 UNKNOWN_DATASTORE = 0
961 MASTER_SLAVE_DATASTORE = 1
962 HIGH_REPLICATION_DATASTORE = 2
964 @_positional(1)
965 def __init__(self, adapter=None, config=None):
966 """Constructor.
968 All arguments should be specified as keyword arguments.
970 Args:
971 adapter: Optional AbstractAdapter subclass instance;
972 default IdentityAdapter.
973 config: Optional Configuration object.
975 if adapter is None:
976 adapter = IdentityAdapter()
977 if not isinstance(adapter, AbstractAdapter):
978 raise datastore_errors.BadArgumentError(
979 'invalid adapter argument (%r)' % (adapter,))
980 self.__adapter = adapter
982 if config is None:
983 config = Configuration()
984 elif not Configuration.is_configuration(config):
985 raise datastore_errors.BadArgumentError(
986 'invalid config argument (%r)' % (config,))
987 self.__config = config
989 self.__pending_rpcs = set()
993 @property
994 def adapter(self):
995 """The adapter used by this connection."""
996 return self.__adapter
998 @property
999 def config(self):
1000 """The default configuration used by this connection."""
1001 return self.__config
1006 def _add_pending(self, rpc):
1007 """Add an RPC object to the list of pending RPCs.
1009 The argument must be a UserRPC object, not a MultiRpc object.
1011 assert not isinstance(rpc, MultiRpc)
1012 self.__pending_rpcs.add(rpc)
1014 def _remove_pending(self, rpc):
1015 """Remove an RPC object from the list of pending RPCs.
1017 If the argument is a MultiRpc object, the wrapped RPCs are removed
1018 from the list of pending RPCs.
1020 if isinstance(rpc, MultiRpc):
1023 for wrapped_rpc in rpc._MultiRpc__rpcs:
1024 self._remove_pending(wrapped_rpc)
1025 else:
1026 try:
1027 self.__pending_rpcs.remove(rpc)
1028 except KeyError:
1031 pass
1033 def is_pending(self, rpc):
1034 """Check whether an RPC object is currently pending.
1036 Note that 'pending' in this context refers to an RPC associated
1037 with this connection for which _remove_pending() hasn't been
1038 called yet; normally this is called by check_rpc_success() which
1039 itself is called by the various result hooks. A pending RPC may
1040 be in the RUNNING or FINISHING state.
1042 If the argument is a MultiRpc object, this returns true if at least
1043 one of its wrapped RPCs is pending.
1045 if isinstance(rpc, MultiRpc):
1046 for wrapped_rpc in rpc._MultiRpc__rpcs:
1047 if self.is_pending(wrapped_rpc):
1048 return True
1049 return False
1050 else:
1051 return rpc in self.__pending_rpcs
1053 def get_pending_rpcs(self):
1054 """Return (a copy of) the list of currently pending RPCs."""
1055 return set(self.__pending_rpcs)
1057 def get_datastore_type(self, app=None):
1058 """Tries to get the datastore type for the given app.
1060 This function is only guaranteed to return something other than
1061 UNKNOWN_DATASTORE when running in production and querying the current app.
1063 return _GetDatastoreType(app)
1065 def wait_for_all_pending_rpcs(self):
1066 """Wait for all currently pending RPCs to complete."""
1067 while self.__pending_rpcs:
1068 try:
1069 rpc = apiproxy_stub_map.UserRPC.wait_any(self.__pending_rpcs)
1070 except Exception:
1075 logging.info('wait_for_all_pending_rpcs(): exception in wait_any()',
1076 exc_info=True)
1077 continue
1078 if rpc is None:
1079 logging.debug('wait_any() returned None')
1080 continue
1081 assert rpc.state == apiproxy_rpc.RPC.FINISHING
1082 if rpc in self.__pending_rpcs:
1089 try:
1090 self.check_rpc_success(rpc)
1091 except Exception:
1093 logging.info('wait_for_all_pending_rpcs(): '
1094 'exception in check_rpc_success()',
1095 exc_info=True)
1100 def create_rpc(self, config=None, service_name='datastore_v3'):
1101 """Create an RPC object using the configuration parameters.
1103 Args:
1104 config: Optional Configuration object.
1105 service_name: Optional datastore service name.
1107 Returns:
1108 A new UserRPC object with the designated settings.
1110 NOTES:
1112 (1) The RPC object returned can only be used to make a single call
1113 (for details see apiproxy_stub_map.UserRPC).
1115 (2) To make a call, use one of the specific methods on the
1116 Connection object, such as conn.put(entities). This sends the
1117 call to the server but does not wait. To wait for the call to
1118 finish and get the result, call rpc.get_result().
1120 deadline = Configuration.deadline(config, self.__config)
1121 on_completion = Configuration.on_completion(config, self.__config)
1122 callback = None
1123 if on_completion is not None:
1126 def callback():
1127 return on_completion(rpc)
1128 rpc = apiproxy_stub_map.UserRPC(service_name, deadline, callback)
1129 return rpc
1131 def _set_request_read_policy(self, request, config=None):
1132 """Set the read policy on a request.
1134 This takes the read policy from the config argument or the
1135 configuration's default configuration, and if it is
1136 EVENTUAL_CONSISTENCY, sets the failover_ms field in the protobuf.
1138 Args:
1139 request: A protobuf with a failover_ms field.
1140 config: Optional Configuration object.
1142 if not (hasattr(request, 'set_failover_ms') and hasattr(request, 'strong')):
1143 raise datastore_errors.BadRequestError(
1144 'read_policy is only supported on read operations.')
1146 if isinstance(config, apiproxy_stub_map.UserRPC):
1147 read_policy = getattr(config, 'read_policy', None)
1148 else:
1149 read_policy = Configuration.read_policy(config)
1152 if read_policy is None:
1153 read_policy = self.__config.read_policy
1155 if read_policy == Configuration.APPLY_ALL_JOBS_CONSISTENCY:
1156 request.set_strong(True)
1157 elif read_policy == Configuration.EVENTUAL_CONSISTENCY:
1158 request.set_strong(False)
1162 request.set_failover_ms(-1)
1164 def _set_request_transaction(self, request):
1165 """Set the current transaction on a request.
1167 NOTE: This version of the method does nothing. The version
1168 overridden by TransactionalConnection is the real thing.
1170 Args:
1171 request: A protobuf with a transaction field.
1173 Returns:
1174 A datastore_pb.Transaction object or None.
1176 return None
1178 def make_rpc_call(self, config, method, request, response,
1179 get_result_hook=None, user_data=None,
1180 service_name='datastore_v3'):
1181 """Make an RPC call.
1183 Except for the added config argument, this is a thin wrapper
1184 around UserRPC.make_call().
1186 Args:
1187 config: A Configuration object or None. Defaults are taken from
1188 the connection's default configuration.
1189 method: The method name.
1190 request: The request protocol buffer.
1191 response: The response protocol buffer.
1192 get_result_hook: Optional get-result hook function. If not None,
1193 this must be a function with exactly one argument, the RPC
1194 object (self). Its return value is returned from get_result().
1195 user_data: Optional additional arbitrary data for the get-result
1196 hook function. This can be accessed as rpc.user_data. The
1197 type of this value is up to the service module.
1199 Returns:
1200 The UserRPC object used for the call.
1204 if isinstance(config, apiproxy_stub_map.UserRPC):
1205 rpc = config
1206 else:
1207 rpc = self.create_rpc(config, service_name)
1208 rpc.make_call(method, request, response, get_result_hook, user_data)
1209 self._add_pending(rpc)
1210 return rpc
1212 def check_rpc_success(self, rpc):
1213 """Check for RPC success and translate exceptions.
1215 This wraps rpc.check_success() and should be called instead of that.
1217 This also removes the RPC from the list of pending RPCs, once it
1218 has completed.
1220 Args:
1221 rpc: A UserRPC or MultiRpc object.
1223 Raises:
1224 Nothing if the call succeeded; various datastore_errors.Error
1225 subclasses if ApplicationError was raised by rpc.check_success().
1227 try:
1228 rpc.wait()
1229 finally:
1232 self._remove_pending(rpc)
1233 try:
1234 rpc.check_success()
1235 except apiproxy_errors.ApplicationError, err:
1236 raise _ToDatastoreError(err)
1242 MAX_RPC_BYTES = 1024 * 1024
1243 MAX_GET_KEYS = 1000
1244 MAX_PUT_ENTITIES = 500
1245 MAX_DELETE_KEYS = 500
1246 MAX_ALLOCATE_IDS_KEYS = 500
1249 DEFAULT_MAX_ENTITY_GROUPS_PER_RPC = 10
1254 def __get_max_entity_groups_per_rpc(self, config):
1255 """Internal helper: figures out max_entity_groups_per_rpc for the config."""
1256 return Configuration.max_entity_groups_per_rpc(
1257 config, self.__config) or self.DEFAULT_MAX_ENTITY_GROUPS_PER_RPC
1259 def _extract_entity_group(self, value):
1260 """Internal helper: extracts the entity group from a key or entity."""
1261 if isinstance(value, entity_pb.EntityProto):
1262 value = value.key()
1263 return value.path().element(0)
1265 def _map_and_group(self, values, map_fn, group_fn):
1266 """Internal helper: map values to keys and group by key. Here key is any
1267 object derived from an input value by map_fn, and which can be grouped
1268 by group_fn.
1270 Args:
1271 values: The values to be grouped by applying get_group(to_ref(value)).
1272 map_fn: a function that maps a value to a key to be grouped.
1273 group_fn: a function that groups the keys output by map_fn.
1275 Returns:
1276 A list where each element is a list of (key, index) pairs. Here
1277 index is the location of the value from which the key was derived in
1278 the original list.
1280 indexed_key_groups = collections.defaultdict(list)
1281 for index, value in enumerate(values):
1282 key = map_fn(value)
1283 indexed_key_groups[group_fn(key)].append((key, index))
1284 return indexed_key_groups.values()
1286 def __group_indexed_pbs_by_entity_group(self, values, to_ref):
1287 """Internal helper: group pbs by entity group.
1289 Args:
1290 values: The values to be grouped by entity group.
1291 to_ref: A function that translates a value to a Reference pb.
1293 Returns:
1294 A list where each element is a list of (pb, index) pairs. Here index is
1295 the location of the value from which pb was derived in the original list.
1297 def get_entity_group(ref):
1298 eg = self._extract_entity_group(ref)
1301 return (eg.type(), eg.id() or eg.name() or ('new', id(eg)))
1303 return self._map_and_group(values, to_ref, get_entity_group)
1305 def __create_result_index_pairs(self, indexes):
1306 """Internal helper: build a function that ties an index with each result.
1308 Args:
1309 indexes: A list of integers. A value x at location y in the list means
1310 that the result at location y in the result list needs to be at location
1311 x in the list of results returned to the user.
1314 def create_result_index_pairs(results):
1315 return zip(results, indexes)
1316 return create_result_index_pairs
1318 def __sort_result_index_pairs(self, extra_hook):
1319 """Builds a function that sorts the indexed results.
1321 Args:
1322 extra_hook: A function that the returned function will apply to its result
1323 before returning.
1325 Returns:
1326 A function that takes a list of results and reorders them to match the
1327 order in which the input values associated with each results were
1328 originally provided.
1331 def sort_result_index_pairs(result_index_pairs):
1332 results = [None] * len(result_index_pairs)
1333 for result, index in result_index_pairs:
1334 results[index] = result
1335 if extra_hook is not None:
1336 results = extra_hook(results)
1337 return results
1338 return sort_result_index_pairs
1340 def _generate_pb_lists(self, grouped_values, base_size, max_count,
1341 max_groups, config):
1342 """Internal helper: repeatedly yield a list of 2 elements.
1344 Args:
1345 grouped_values: A list of lists. The inner lists consist of objects
1346 grouped by e.g. entity group or id sequence.
1348 base_size: An integer representing the base size of an rpc. Used for
1349 splitting operations across multiple RPCs due to size limitations.
1351 max_count: An integer representing the maximum number of objects we can
1352 send in an rpc. Used for splitting operations across multiple RPCs.
1354 max_groups: An integer representing the maximum number of groups we can
1355 have represented in an rpc. Can be None, in which case no constraint.
1357 config: The config object, defining max rpc size in bytes.
1359 Yields:
1360 Repeatedly yields 2 element tuples. The first element is a list of
1361 protobufs to send in one batch. The second element is a list containing
1362 the original location of those protobufs (expressed as an index) in the
1363 input.
1365 max_size = (Configuration.max_rpc_bytes(config, self.__config) or
1366 self.MAX_RPC_BYTES)
1367 pbs = []
1368 pb_indexes = []
1369 size = base_size
1370 num_groups = 0
1371 for indexed_pbs in grouped_values:
1372 num_groups += 1
1373 if max_groups is not None and num_groups > max_groups:
1374 yield (pbs, pb_indexes)
1375 pbs = []
1376 pb_indexes = []
1377 size = base_size
1378 num_groups = 1
1379 for indexed_pb in indexed_pbs:
1380 (pb, index) = indexed_pb
1382 incr_size = pb.lengthString(pb.ByteSize()) + 1
1387 if (not isinstance(config, apiproxy_stub_map.UserRPC) and
1388 (len(pbs) >= max_count or (pbs and size + incr_size > max_size))):
1389 yield (pbs, pb_indexes)
1390 pbs = []
1391 pb_indexes = []
1392 size = base_size
1393 num_groups = 1
1394 pbs.append(pb)
1395 pb_indexes.append(index)
1396 size += incr_size
1397 yield (pbs, pb_indexes)
1399 def _get_base_size(self, base_req):
1400 """Internal helper: return request size in bytes."""
1401 return base_req.ByteSize()
1403 def get(self, keys):
1404 """Synchronous Get operation.
1406 Args:
1407 keys: An iterable of user-level key objects.
1409 Returns:
1410 A list of user-level entity objects and None values, corresponding
1411 1:1 to the argument keys. A None means there is no entity for the
1412 corresponding key.
1414 return self.async_get(None, keys).get_result()
1416 def async_get(self, config, keys, extra_hook=None):
1417 """Asynchronous Get operation.
1419 Args:
1420 config: A Configuration object or None. Defaults are taken from
1421 the connection's default configuration.
1422 keys: An iterable of user-level key objects.
1423 extra_hook: Optional function to be called on the result once the
1424 RPC has completed.
1426 Returns:
1427 A MultiRpc object.
1430 def make_get_call(req, pbs, user_data=None):
1431 req.key_list().extend(pbs)
1432 self._set_request_transaction(req)
1433 resp = datastore_pb.GetResponse()
1434 return self.make_rpc_call(config, 'Get', req, resp,
1435 self.__get_hook, user_data)
1437 base_req = datastore_pb.GetRequest()
1438 self._set_request_read_policy(base_req, config)
1441 if isinstance(config, apiproxy_stub_map.UserRPC) or len(keys) <= 1:
1442 pbs = [self.__adapter.key_to_pb(key) for key in keys]
1443 return make_get_call(base_req, pbs, extra_hook)
1445 base_size = self._get_base_size(base_req)
1446 max_count = (Configuration.max_get_keys(config, self.__config) or
1447 self.MAX_GET_KEYS)
1449 if base_req.has_strong():
1450 is_read_current = base_req.strong()
1451 else:
1452 is_read_current = (self.get_datastore_type() ==
1453 BaseConnection.HIGH_REPLICATION_DATASTORE)
1455 indexed_keys_by_entity_group = self.__group_indexed_pbs_by_entity_group(
1456 keys, self.__adapter.key_to_pb)
1461 if is_read_current and not base_req.has_transaction():
1462 max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config)
1463 else:
1464 max_egs_per_rpc = None
1468 pbsgen = self._generate_pb_lists(
1469 indexed_keys_by_entity_group, base_size, max_count, max_egs_per_rpc,
1470 config)
1472 rpcs = []
1473 for pbs, indexes in pbsgen:
1474 req = datastore_pb.GetRequest()
1475 req.CopyFrom(base_req)
1476 rpcs.append(make_get_call(req, pbs,
1477 self.__create_result_index_pairs(indexes)))
1478 return MultiRpc(rpcs, self.__sort_result_index_pairs(extra_hook))
1480 def __get_hook(self, rpc):
1481 """Internal method used as get_result_hook for Get operation."""
1482 self.check_rpc_success(rpc)
1483 entities = []
1484 for group in rpc.response.entity_list():
1485 if group.has_entity():
1486 entity = self.__adapter.pb_to_entity(group.entity())
1487 else:
1488 entity = None
1489 entities.append(entity)
1490 if rpc.user_data is not None:
1491 entities = rpc.user_data(entities)
1492 return entities
1494 def get_indexes(self):
1495 """Synchronous get indexes operation.
1497 Returns:
1498 user-level indexes representation
1500 return self.async_get_indexes(None).get_result()
1502 def async_get_indexes(self, config, extra_hook=None, _app=None):
1503 """Asynchronous get indexes operation.
1505 Args:
1506 config: A Configuration object or None. Defaults are taken from
1507 the connection's default configuration.
1508 extra_hook: Optional function to be called once the RPC has completed.
1510 Returns:
1511 A MultiRpc object.
1513 req = api_base_pb.StringProto()
1514 req.set_value(datastore_types.ResolveAppId(_app))
1515 resp = datastore_pb.CompositeIndices()
1516 return self.make_rpc_call(config, 'GetIndices', req, resp,
1517 self.__get_indexes_hook, extra_hook)
1519 def __get_indexes_hook(self, rpc):
1520 """Internal method used as get_result_hook for Get operation."""
1521 self.check_rpc_success(rpc)
1522 indexes = [self.__adapter.pb_to_index(index)
1523 for index in rpc.response.index_list()]
1524 if rpc.user_data:
1525 indexes = rpc.user_data(indexes)
1526 return indexes
1528 def put(self, entities):
1529 """Synchronous Put operation.
1531 Args:
1532 entities: An iterable of user-level entity objects.
1534 Returns:
1535 A list of user-level key objects, corresponding 1:1 to the
1536 argument entities.
1538 NOTE: If any of the entities has an incomplete key, this will
1539 *not* patch up those entities with the complete key.
1541 return self.async_put(None, entities).get_result()
1543 def async_put(self, config, entities, extra_hook=None):
1544 """Asynchronous Put operation.
1546 Args:
1547 config: A Configuration object or None. Defaults are taken from
1548 the connection's default configuration.
1549 entities: An iterable of user-level entity objects.
1550 extra_hook: Optional function to be called on the result once the
1551 RPC has completed.
1553 Returns:
1554 A MultiRpc object.
1556 NOTE: If any of the entities has an incomplete key, this will
1557 *not* patch up those entities with the complete key.
1560 def make_put_call(req, pbs, user_data=None):
1561 req.entity_list().extend(pbs)
1562 self._set_request_transaction(req)
1563 resp = datastore_pb.PutResponse()
1564 return self.make_rpc_call(config, 'Put', req, resp,
1565 self.__put_hook, user_data)
1568 base_req = datastore_pb.PutRequest()
1569 if Configuration.force_writes(config, self.__config):
1570 base_req.set_force(True)
1573 if isinstance(config, apiproxy_stub_map.UserRPC) or len(entities) <= 1:
1574 pbs = [self.__adapter.entity_to_pb(entity) for entity in entities]
1575 return make_put_call(base_req, pbs, extra_hook)
1577 base_size = self._get_base_size(base_req)
1578 max_count = (Configuration.max_put_entities(config, self.__config) or
1579 self.MAX_PUT_ENTITIES)
1580 rpcs = []
1581 indexed_entities_by_entity_group = self.__group_indexed_pbs_by_entity_group(
1582 entities, self.__adapter.entity_to_pb)
1583 if not base_req.has_transaction():
1584 max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config)
1585 else:
1586 max_egs_per_rpc = None
1588 pbsgen = self._generate_pb_lists(
1589 indexed_entities_by_entity_group, base_size, max_count, max_egs_per_rpc,
1590 config)
1592 for pbs, indexes in pbsgen:
1593 req = datastore_pb.PutRequest()
1594 req.CopyFrom(base_req)
1595 rpcs.append(make_put_call(req, pbs,
1596 self.__create_result_index_pairs(indexes)))
1597 return MultiRpc(rpcs, self.__sort_result_index_pairs(extra_hook))
1599 def __put_hook(self, rpc):
1600 """Internal method used as get_result_hook for Put operation."""
1601 self.check_rpc_success(rpc)
1602 keys = [self.__adapter.pb_to_key(pb)
1603 for pb in rpc.response.key_list()]
1606 if rpc.user_data is not None:
1607 keys = rpc.user_data(keys)
1608 return keys
1610 def delete(self, keys):
1611 """Synchronous Delete operation.
1613 Args:
1614 keys: An iterable of user-level key objects.
1616 Returns:
1617 None.
1619 return self.async_delete(None, keys).get_result()
1621 def async_delete(self, config, keys, extra_hook=None):
1622 """Asynchronous Delete operation.
1624 Args:
1625 config: A Configuration object or None. Defaults are taken from
1626 the connection's default configuration.
1627 keys: An iterable of user-level key objects.
1628 extra_hook: Optional function to be called once the RPC has completed.
1630 Returns:
1631 A MultiRpc object.
1634 def make_delete_call(req, pbs, user_data=None):
1635 req.key_list().extend(pbs)
1636 self._set_request_transaction(req)
1637 resp = datastore_pb.DeleteResponse()
1638 return self.make_rpc_call(config, 'Delete', req, resp,
1639 self.__delete_hook, user_data)
1642 base_req = datastore_pb.DeleteRequest()
1643 if Configuration.force_writes(config, self.__config):
1644 base_req.set_force(True)
1647 if isinstance(config, apiproxy_stub_map.UserRPC) or len(keys) <= 1:
1648 pbs = [self.__adapter.key_to_pb(key) for key in keys]
1649 return make_delete_call(base_req, pbs, extra_hook)
1651 base_size = self._get_base_size(base_req)
1652 max_count = (Configuration.max_delete_keys(config, self.__config) or
1653 self.MAX_DELETE_KEYS)
1654 if not base_req.has_transaction():
1655 max_egs_per_rpc = self.__get_max_entity_groups_per_rpc(config)
1656 else:
1657 max_egs_per_rpc = None
1658 indexed_keys_by_entity_group = self.__group_indexed_pbs_by_entity_group(
1659 keys, self.__adapter.key_to_pb)
1663 pbsgen = self._generate_pb_lists(
1664 indexed_keys_by_entity_group, base_size, max_count, max_egs_per_rpc,
1665 config)
1666 rpcs = []
1667 for pbs, _ in pbsgen:
1668 req = datastore_pb.DeleteRequest()
1669 req.CopyFrom(base_req)
1670 rpcs.append(make_delete_call(req, pbs))
1671 return MultiRpc(rpcs, extra_hook)
1673 def __delete_hook(self, rpc):
1674 """Internal method used as get_result_hook for Delete operation."""
1675 self.check_rpc_success(rpc)
1676 if rpc.user_data is not None:
1678 rpc.user_data(None)
1682 def begin_transaction(self, app):
1683 """Syncnronous BeginTransaction operation.
1685 NOTE: In most cases the new_transaction() method is preferred,
1686 since that returns a TransactionalConnection object which will
1687 begin the transaction lazily.
1689 Args:
1690 app: Application ID.
1692 Returns:
1693 A datastore_pb.Transaction object.
1695 return self.async_begin_transaction(None, app).get_result()
1697 def async_begin_transaction(self, config, app):
1698 """Asynchronous BeginTransaction operation.
1700 Args:
1701 config: A configuration object or None. Defaults are taken from
1702 the connection's default configuration.
1703 app: Application ID.
1705 Returns:
1706 A MultiRpc object.
1708 if not isinstance(app, basestring) or not app:
1709 raise datastore_errors.BadArgumentError(
1710 'begin_transaction requires an application id argument (%r)' %
1711 (app,))
1712 req = datastore_pb.BeginTransactionRequest()
1713 req.set_app(app)
1714 if (TransactionOptions.xg(config, self.__config)):
1715 req.set_allow_multiple_eg(True)
1716 resp = datastore_pb.Transaction()
1717 rpc = self.make_rpc_call(config, 'BeginTransaction', req, resp,
1718 self.__begin_transaction_hook)
1719 return rpc
1721 def __begin_transaction_hook(self, rpc):
1722 """Internal method used as get_result_hook for BeginTransaction."""
1723 self.check_rpc_success(rpc)
1724 return rpc.response
1727 class Connection(BaseConnection):
1728 """Transaction-less connection class.
1730 This contains those operations that are not allowed on transactional
1731 connections. (Currently only allocate_ids and reserve_key_ids.)
1734 @_positional(1)
1735 def __init__(self, adapter=None, config=None):
1736 """Constructor.
1738 All arguments should be specified as keyword arguments.
1740 Args:
1741 adapter: Optional AbstractAdapter subclass instance;
1742 default IdentityAdapter.
1743 config: Optional Configuration object.
1745 super(Connection, self).__init__(adapter=adapter, config=config)
1746 self.__adapter = self.adapter
1747 self.__config = self.config
1751 def new_transaction(self, config=None):
1752 """Create a new transactional connection based on this one.
1754 This is different from, and usually preferred over, the
1755 begin_transaction() method; new_transaction() returns a new
1756 TransactionalConnection object.
1758 Args:
1759 config: A configuration object for the new connection, merged
1760 with this connection's config.
1762 config = self.__config.merge(config)
1763 return TransactionalConnection(adapter=self.__adapter, config=config)
1767 def __to_v4_key(self, ref):
1768 """Convert a valid v3 Reference pb to a v4 Key pb."""
1769 key = entity_v4_pb.Key()
1770 key.mutable_partition_id().set_dataset_id(ref.app())
1771 if ref.name_space():
1772 key.mutable_partition_id().set_namespace(ref.name_space())
1773 for el_v3 in ref.path().element_list():
1774 el_v4 = key.add_path_element()
1775 el_v4.set_kind(el_v3.type())
1776 if el_v3.has_id():
1777 el_v4.set_id(el_v3.id())
1778 if el_v3.has_name():
1779 el_v4.set_name(el_v3.name())
1780 return key
1782 def __to_v3_reference(self, key):
1783 """Convert a valid v4 Key pb to a v3 Reference pb."""
1784 ref = entity_pb.Reference()
1785 ref.set_app(key.partition_id().dataset_id())
1786 if key.partition_id().has_namespace():
1787 ref.set_name_space(key.partition_id().namespace())
1788 for el_v4 in key.path_element_list():
1789 el_v3 = ref.mutable_path().add_element()
1790 el_v3.set_type(el_v4.kind())
1791 if el_v4.has_id():
1792 el_v3.set_id(el_v4.id())
1793 if el_v4.has_name():
1794 el_v3.set_name(el_v4.name())
1795 return ref
1799 def allocate_ids(self, key, size=None, max=None):
1800 """Synchronous AllocateIds operation.
1802 Exactly one of size and max must be specified.
1804 Args:
1805 key: A user-level key object.
1806 size: Optional number of IDs to allocate.
1807 max: Optional maximum ID to allocate.
1809 Returns:
1810 A pair (start, end) giving the (inclusive) range of IDs allocation.
1812 return self.async_allocate_ids(None, key, size, max).get_result()
1814 def async_allocate_ids(self, config, key, size=None, max=None,
1815 extra_hook=None):
1816 """Asynchronous AllocateIds operation.
1818 Args:
1819 config: A Configuration object or None. Defaults are taken from
1820 the connection's default configuration.
1821 key: A user-level key object.
1822 size: Optional number of IDs to allocate.
1823 max: Optional maximum ID to allocate.
1824 extra_hook: Optional function to be called on the result once the
1825 RPC has completed.
1827 Returns:
1828 A MultiRpc object.
1830 if size is not None:
1831 if max is not None:
1832 raise datastore_errors.BadArgumentError(
1833 'Cannot allocate ids using both size and max')
1834 if not isinstance(size, (int, long)):
1835 raise datastore_errors.BadArgumentError('Invalid size (%r)' % (size,))
1836 if size > _MAX_ID_BATCH_SIZE:
1837 raise datastore_errors.BadArgumentError(
1838 'Cannot allocate more than %s ids at a time; received %s'
1839 % (_MAX_ID_BATCH_SIZE, size))
1840 if size <= 0:
1841 raise datastore_errors.BadArgumentError(
1842 'Cannot allocate less than 1 id; received %s' % size)
1843 if max is not None:
1844 if not isinstance(max, (int, long)):
1845 raise datastore_errors.BadArgumentError('Invalid max (%r)' % (max,))
1846 if max < 0:
1847 raise datastore_errors.BadArgumentError(
1848 'Cannot allocate a range with a max less than 0 id; received %s' %
1849 size)
1850 req = datastore_pb.AllocateIdsRequest()
1851 req.mutable_model_key().CopyFrom(self.__adapter.key_to_pb(key))
1852 if size is not None:
1853 req.set_size(size)
1854 if max is not None:
1855 req.set_max(max)
1856 resp = datastore_pb.AllocateIdsResponse()
1857 rpc = self.make_rpc_call(config, 'AllocateIds', req, resp,
1858 self.__allocate_ids_hook, extra_hook)
1859 return rpc
1861 def __allocate_ids_hook(self, rpc):
1862 """Internal method used as get_result_hook for AllocateIds."""
1863 self.check_rpc_success(rpc)
1864 pair = rpc.response.start(), rpc.response.end()
1865 if rpc.user_data is not None:
1866 pair = rpc.user_data(pair)
1867 return pair
1871 def _reserve_keys(self, keys):
1872 """Synchronous AllocateIds operation to reserve the given keys.
1874 Sends one or more v4 AllocateIds rpcs with keys to reserve.
1875 Reserved keys must be complete and must have valid ids.
1877 Args:
1878 keys: Iterable of user-level keys.
1880 self._async_reserve_keys(None, keys).get_result()
1882 def _async_reserve_keys(self, config, keys, extra_hook=None):
1883 """Asynchronous AllocateIds operation to reserve the given keys.
1885 Sends one or more v4 AllocateIds rpcs with keys to reserve.
1886 Reserved keys must be complete and must have valid ids.
1888 Args:
1889 config: A Configuration object or None to use Connection default.
1890 keys: Iterable of user-level keys.
1891 extra_hook: Optional function to be called on rpc result.
1893 Returns:
1894 None, or the result of user-supplied extra_hook.
1896 def to_id_key(key):
1897 if key.path().element_size() == 1:
1898 return 'root_idkey'
1899 else:
1900 eg = self._extract_entity_group(key)
1901 return (eg.type(), eg.id() or eg.name())
1903 keys_by_idkey = self._map_and_group(keys, self.__adapter.key_to_pb,
1904 to_id_key)
1905 max_count = (Configuration.max_allocate_ids_keys(config, self.__config) or
1906 self.MAX_ALLOCATE_IDS_KEYS)
1908 rpcs = []
1909 pbsgen = self._generate_pb_lists(keys_by_idkey, 0, max_count, None, config)
1910 for pbs, _ in pbsgen:
1911 req = datastore_v4a_pb.AllocateIdsRequest()
1912 req.reserve_list().extend([self.__to_v4_key(key) for key in pbs])
1913 resp = datastore_v4a_pb.AllocateIdsResponse()
1914 rpcs.append(self.make_rpc_call(config, 'AllocateIds', req, resp,
1915 self.__reserve_keys_hook, extra_hook,
1916 'datastore_v4'))
1917 return MultiRpc(rpcs)
1919 def __reserve_keys_hook(self, rpc):
1920 """Internal get_result_hook for _reserve_keys."""
1921 self.check_rpc_success(rpc)
1922 if rpc.user_data is not None:
1923 return rpc.user_data(rpc.response)
1926 class TransactionOptions(Configuration):
1927 """An immutable class that contains options for a transaction."""
1929 NESTED = 1
1930 """Create a nested transaction under an existing one."""
1932 MANDATORY = 2
1933 """Always propagate an existing transaction, throw an exception if there is
1934 no existing transaction."""
1936 ALLOWED = 3
1937 """If there is an existing transaction propagate it."""
1939 INDEPENDENT = 4
1940 """Always use a new transaction, pausing any existing transactions."""
1942 _PROPAGATION = frozenset((NESTED, MANDATORY, ALLOWED, INDEPENDENT))
1944 @ConfigOption
1945 def propagation(value):
1946 """How existing transactions should be handled.
1948 One of NESTED, MANDATORY, ALLOWED, INDEPENDENT. The interpertation of
1949 these types is up to higher level run-in-transaction implementations.
1951 WARNING: Using anything other than NESTED for the propagation flag
1952 can have strange consequences. When using ALLOWED or MANDATORY, if
1953 an exception is raised, the transaction is likely not safe to
1954 commit. When using INDEPENDENT it is not generally safe to return
1955 values read to the caller (as they were not read in the caller's
1956 transaction).
1958 Raises: datastore_errors.BadArgumentError if value is not reconized.
1960 if value not in TransactionOptions._PROPAGATION:
1961 raise datastore_errors.BadArgumentError('Unknown propagation value (%r)' %
1962 (value,))
1963 return value
1965 @ConfigOption
1966 def xg(value):
1967 """Whether to allow cross-group transactions.
1969 Raises: datastore_errors.BadArgumentError if value is not a bool.
1971 if not isinstance(value, bool):
1972 raise datastore_errors.BadArgumentError(
1973 'xg argument should be bool (%r)' % (value,))
1974 return value
1976 @ConfigOption
1977 def retries(value):
1978 """How many retries to attempt on the transaction.
1980 The exact retry logic is implemented in higher level run-in-transaction
1981 implementations.
1983 Raises: datastore_errors.BadArgumentError if value is not an integer or
1984 is not greater than zero.
1986 datastore_types.ValidateInteger(value,
1987 'retries',
1988 datastore_errors.BadArgumentError,
1989 zero_ok=True)
1990 return value
1992 @ConfigOption
1993 def app(value):
1994 """The application in which to perform the transaction.
1996 Raises: datastore_errors.BadArgumentError if value is not a string
1997 or is the empty string.
1999 datastore_types.ValidateString(value,
2000 'app',
2001 datastore_errors.BadArgumentError)
2002 return value
2005 class TransactionalConnection(BaseConnection):
2006 """A connection specific to one transaction.
2008 It is possible to pass the transaction and entity group to the
2009 constructor, but typically the transaction is lazily created by
2010 _get_transaction() when the first operation is started.
2013 @_positional(1)
2014 def __init__(self,
2015 adapter=None, config=None, transaction=None, entity_group=None):
2016 """Constructor.
2018 All arguments should be specified as keyword arguments.
2020 Args:
2021 adapter: Optional AbstractAdapter subclass instance;
2022 default IdentityAdapter.
2023 config: Optional Configuration object.
2024 transaction: Optional datastore_db.Transaction object.
2025 entity_group: Deprecated, do not use.
2027 super(TransactionalConnection, self).__init__(adapter=adapter,
2028 config=config)
2029 self.__adapter = self.adapter
2030 if transaction is None:
2031 app = TransactionOptions.app(self.config)
2032 app = datastore_types.ResolveAppId(TransactionOptions.app(self.config))
2033 self.__transaction_rpc = self.async_begin_transaction(None, app)
2034 else:
2035 if not isinstance(transaction, datastore_pb.Transaction):
2036 raise datastore_errors.BadArgumentError(
2037 'Invalid transaction (%r)' % (transaction,))
2038 self.__transaction = transaction
2039 self.__transaction_rpc = None
2040 self.__finished = False
2042 def _get_base_size(self, base_req):
2043 """Internal helper: return size in bytes plus room for transaction."""
2044 return (super(TransactionalConnection, self)._get_base_size(base_req) +
2045 self.transaction.lengthString(self.transaction.ByteSize()) + 1)
2047 @property
2048 def finished(self):
2049 return self.__finished
2051 @property
2052 def transaction(self):
2053 if self.__transaction_rpc is not None:
2054 self.__transaction = self.__transaction_rpc.get_result()
2055 self.__transaction_rpc = None
2056 return self.__transaction
2058 def _set_request_transaction(self, request):
2059 """Set the current transaction on a request.
2061 This calls _get_transaction() (see below). The transaction object
2062 returned is both set as the transaction field on the request
2063 object and returned.
2065 Args:
2066 request: A protobuf with a transaction field.
2068 Returns:
2069 A datastore_pb.Transaction object or None.
2071 if self.__finished:
2072 raise datastore_errors.BadRequestError(
2073 'Cannot start a new operation in a finished transaction.')
2074 transaction = self.transaction
2075 request.mutable_transaction().CopyFrom(transaction)
2076 return transaction
2078 def _end_transaction(self):
2079 """Finish the current transaction.
2081 This blocks waiting for all pending RPCs to complete, and then
2082 marks the connection as finished. After that no more operations
2083 can be started using this connection.
2085 Returns:
2086 A datastore_pb.Transaction object or None.
2088 Raises:
2089 datastore_errors.BadRequestError if the transaction is already
2090 finished.
2092 if self.__finished:
2093 raise datastore_errors.BadRequestError(
2094 'The transaction is already finished.')
2097 self.wait_for_all_pending_rpcs()
2098 assert not self.get_pending_rpcs()
2099 transaction = self.transaction
2100 self.__finished = True
2101 self.__transaction = None
2102 return transaction
2106 def commit(self):
2107 """Synchronous Commit operation.
2109 Returns:
2110 True if the transaction was successfully committed. False if
2111 the backend reported a concurrent transaction error.
2115 rpc = self.create_rpc()
2116 rpc = self.async_commit(rpc)
2117 if rpc is None:
2118 return True
2119 return rpc.get_result()
2121 def async_commit(self, config):
2122 """Asynchronous Commit operation.
2124 Args:
2125 config: A Configuration object or None. Defaults are taken from
2126 the connection's default configuration.
2128 Returns:
2129 A MultiRpc object.
2131 transaction = self._end_transaction()
2132 if transaction is None:
2133 return None
2134 resp = datastore_pb.CommitResponse()
2135 rpc = self.make_rpc_call(config, 'Commit', transaction, resp,
2136 self.__commit_hook)
2137 return rpc
2139 def __commit_hook(self, rpc):
2140 """Internal method used as get_result_hook for Commit."""
2141 try:
2142 rpc.check_success()
2143 except apiproxy_errors.ApplicationError, err:
2144 if err.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2145 return False
2146 else:
2147 raise _ToDatastoreError(err)
2148 else:
2149 return True
2153 def rollback(self):
2154 """Synchronous Rollback operation."""
2155 rpc = self.async_rollback(None)
2156 if rpc is None:
2157 return None
2158 return rpc.get_result()
2160 def async_rollback(self, config):
2161 """Asynchronous Rollback operation.
2163 Args:
2164 config: A Configuration object or None. Defaults are taken from
2165 the connection's default configuration.
2167 Returns:
2168 A MultiRpc object.
2170 transaction = self._end_transaction()
2171 if transaction is None:
2172 return None
2173 resp = api_base_pb.VoidProto()
2174 rpc = self.make_rpc_call(config, 'Rollback', transaction, resp,
2175 self.__rollback_hook)
2176 return rpc
2178 def __rollback_hook(self, rpc):
2179 """Internal method used as get_result_hook for Rollback."""
2180 self.check_rpc_success(rpc)
2186 def _ToDatastoreError(err):
2187 """Converts an apiproxy.ApplicationError to an error in datastore_errors.
2189 Args:
2190 err: An apiproxy.ApplicationError object.
2192 Returns:
2193 An instance of a subclass of datastore_errors.Error.
2195 return _DatastoreExceptionFromErrorCodeAndDetail(err.application_error,
2196 err.error_detail)
2199 def _DatastoreExceptionFromErrorCodeAndDetail(error, detail):
2200 """Converts a datastore_pb.Error into a datastore_errors.Error.
2202 Args:
2203 error: A member of the datastore_pb.Error enumeration.
2204 detail: A string providing extra details about the error.
2206 Returns:
2207 An instance of a subclass of datastore_errors.Error.
2209 exception_class = {
2210 datastore_pb.Error.BAD_REQUEST: datastore_errors.BadRequestError,
2211 datastore_pb.Error.CONCURRENT_TRANSACTION:
2212 datastore_errors.TransactionFailedError,
2213 datastore_pb.Error.INTERNAL_ERROR: datastore_errors.InternalError,
2214 datastore_pb.Error.NEED_INDEX: datastore_errors.NeedIndexError,
2215 datastore_pb.Error.TIMEOUT: datastore_errors.Timeout,
2216 datastore_pb.Error.BIGTABLE_ERROR: datastore_errors.Timeout,
2217 datastore_pb.Error.COMMITTED_BUT_STILL_APPLYING:
2218 datastore_errors.CommittedButStillApplying,
2219 datastore_pb.Error.CAPABILITY_DISABLED:
2220 apiproxy_errors.CapabilityDisabledError,
2221 }.get(error, datastore_errors.Error)
2223 if detail is None:
2224 return exception_class()
2225 else:
2226 return exception_class(detail)