3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Asynchronous datastore API.
23 This is designed to be the lowest-level API to be used by all Python
24 datastore client libraries.
26 A refactoring is in progress to rebuild datastore.py on top of this,
27 while remaining nearly 100% backwards compatible. A new (not intended
28 to be compatible) library to replace db.py is also under development.
38 __all__
= ['AbstractAdapter',
46 'TransactionalConnection',
58 from google
.appengine
.datastore
import entity_pb
61 from google
.appengine
.api
import api_base_pb
62 from google
.appengine
.api
import apiproxy_rpc
63 from google
.appengine
.api
import apiproxy_stub_map
65 from google
.appengine
.api
import datastore_errors
66 from google
.appengine
.api
import datastore_types
68 from google
.appengine
.api
.app_identity
import app_identity
69 from google
.appengine
.datastore
import datastore_pb
70 from google
.appengine
.datastore
import datastore_v4a_pb
71 from google
.appengine
.datastore
import entity_v4_pb
72 from google
.appengine
.runtime
import apiproxy_errors
78 _MAX_ID_BATCH_SIZE
= 1000 * 1000 * 1000
82 def _positional(max_pos_args
):
83 """A decorator to declare that only the first N arguments may be positional.
85 Note that for methods, n includes 'self'.
87 def positional_decorator(wrapped
):
88 @functools.wraps(wrapped
)
89 def positional_wrapper(*args
, **kwds
):
90 if len(args
) > max_pos_args
:
95 '%s() takes at most %d positional argument%s (%d given)' %
96 (wrapped
.__name
__, max_pos_args
, plural_s
, len(args
)))
97 return wrapped(*args
, **kwds
)
98 return positional_wrapper
99 return positional_decorator
102 def _GetDatastoreType(app
=None):
103 """Tries to get the datastore type for the given app.
105 This function is only guaranteed to return something other than
106 UNKNOWN_DATASTORE when running in production and querying the current app.
108 current_app
= datastore_types
.ResolveAppId(None)
109 if app
not in (current_app
, None):
110 return BaseConnection
.UNKNOWN_DATASTORE
115 partition
, _
, _
= app_identity
._ParseFullAppId
(current_app
)
117 return BaseConnection
.HIGH_REPLICATION_DATASTORE
118 return BaseConnection
.MASTER_SLAVE_DATASTORE
121 class AbstractAdapter(object):
122 """Abstract interface between protobufs and user-level classes.
124 This class defines conversions between the protobuf classes defined
125 in entity_pb.py on the one hand, and the corresponding user-level
126 classes (which are defined by higher-level API libraries such as
127 datastore.py or db.py) on the other hand.
129 The premise is that the code in this module is agnostic about the
130 user-level classes used to represent keys and entities, while at the
131 same time provinging APIs that accept or return such user-level
134 Higher-level libraries must subclass this abstract class and pass an
135 instance of the subclass to the Connection they want to use.
137 These methods may raise datastore_errors.Error for bad inputs.
140 def pb_to_key(self
, pb
):
141 """Turn an entity_pb.Reference into a user-level key."""
142 raise NotImplementedError
144 def pb_to_entity(self
, pb
):
145 """Turn an entity_pb.EntityProto into a user-level entity."""
146 raise NotImplementedError
148 def pb_to_index(self
, pb
):
149 """Turn an entity_pb.CompositeIndex into a user-level Index
151 raise NotImplementedError
153 def pb_to_query_result(self
, pb
, query_options
):
154 """Turn an entity_pb.EntityProto into a user-level query result."""
155 if query_options
.keys_only
:
156 return self
.pb_to_key(pb
.key())
158 return self
.pb_to_entity(pb
)
160 def key_to_pb(self
, key
):
161 """Turn a user-level key into an entity_pb.Reference."""
162 raise NotImplementedError
164 def entity_to_pb(self
, entity
):
165 """Turn a user-level entity into an entity_pb.EntityProto."""
166 raise NotImplementedError
168 def new_key_pb(self
):
169 """Create a new, empty entity_pb.Reference."""
170 return entity_pb
.Reference()
172 def new_entity_pb(self
):
173 """Create a new, empty entity_pb.EntityProto."""
174 return entity_pb
.EntityProto()
177 class IdentityAdapter(AbstractAdapter
):
178 """A concrete adapter that implements the identity mapping.
180 This is used as the default when a Connection is created without
181 specifying an adapter; that's primarily for testing.
184 def pb_to_key(self
, pb
):
187 def pb_to_entity(self
, pb
):
190 def key_to_pb(self
, key
):
193 def entity_to_pb(self
, entity
):
196 def pb_to_index(self
, pb
):
200 class ConfigOption(object):
201 """A descriptor for a Configuration option.
203 This class is used to create a configuration option on a class that inherits
204 from BaseConfiguration. A validator function decorated with this class will
205 be converted to a read-only descriptor and BaseConfiguration will implement
206 constructor and merging logic for that configuration option. A validator
207 function takes a single non-None value to validate and either throws
208 an exception or returns that value (or an equivalent value). A validator is
209 called once at construction time, but only if a non-None value for the
210 configuration option is specified the constructor's keyword arguments.
213 def __init__(self
, validator
):
214 self
.validator
= validator
216 def __get__(self
, obj
, objtype
):
219 return obj
._values
.get(self
.validator
.__name
__, None)
221 def __set__(self
, obj
, value
):
222 raise AttributeError('Configuration options are immutable (%s)' %
223 (self
.validator
.__name
__,))
225 def __call__(self
, *args
):
226 """Gets the first non-None value for this option from the given args.
229 *arg: Any number of configuration objects or None values.
232 The first value for this ConfigOption found in the given configuration
236 datastore_errors.BadArgumentError if a given in object is not a
237 configuration object.
239 name
= self
.validator
.__name
__
242 if isinstance(config
, (type(None), apiproxy_stub_map
.UserRPC
)):
244 elif not isinstance(config
, BaseConfiguration
):
245 raise datastore_errors
.BadArgumentError(
246 'invalid config argument (%r)' % (config
,))
247 elif name
in config
._values
and self
is config
._options
[name
]:
248 return config
._values
[name
]
252 class _ConfigurationMetaClass(type):
253 """The metaclass for all Configuration types.
255 This class is needed to store a class specific list of all ConfigOptions in
256 cls._options, and insert a __slots__ variable into the class dict before the
257 class is created to impose immutability.
260 def __new__(metaclass
, classname
, bases
, classDict
):
261 if classname
== '_MergedConfiguration':
263 return type.__new
__(metaclass
, classname
, bases
, classDict
)
268 classDict
['__slots__'] = ['_values']
270 classDict
['__slots__'] = []
271 cls
= type.__new
__(metaclass
, classname
, bases
, classDict
)
272 if object not in bases
:
274 for c
in reversed(cls
.__mro
__):
275 if '_options' in c
.__dict
__:
276 options
.update(c
.__dict
__['_options'])
277 cls
._options
= options
278 for option
, value
in cls
.__dict
__.iteritems():
279 if isinstance(value
, ConfigOption
):
280 if cls
._options
.has_key(option
):
281 raise TypeError('%s cannot be overridden (%s)' %
282 (option
, cls
.__name
__))
283 cls
._options
[option
] = value
290 class BaseConfiguration(object):
291 """A base class for a configuration object.
293 Subclasses should provide validation functions for every configuration option
294 they accept. Any public function decorated with ConfigOption is assumed to be
295 a validation function for an option of the same name. All validation functions
296 take a single non-None value to validate and must throw an exception or return
299 This class forces subclasses to be immutable and exposes a read-only
300 property for every accepted configuration option. Configuration options set by
301 passing keyword arguments to the constructor. The constructor and merge
302 function are designed to avoid creating redundant copies and may return
303 the configuration objects passed to them if appropriate.
305 Setting an option to None is the same as not specifying the option except in
306 the case where the 'config' argument is given. In this case the value on
307 'config' of the same name is ignored. Options that are not specified will
308 return 'None' when accessed.
311 __metaclass__
= _ConfigurationMetaClass
314 def __new__(cls
, config
=None, **kwargs
):
315 """Immutable constructor.
317 If 'config' is non-None all configuration options will default to the value
318 it contains unless the configuration option is explicitly set to 'None' in
319 the keyword arguments. If 'config' is None then all configuration options
323 config: Optional base configuration providing default values for
324 parameters not specified in the keyword arguments.
325 **kwargs: Configuration options to store on this object.
328 Either a new Configuration object or (if it would be equivalent)
329 the config argument unchanged, but never None.
333 elif isinstance(config
, BaseConfiguration
):
334 if cls
is config
.__class
__ and config
.__is
_stronger
(**kwargs
):
338 for key
, value
in config
._values
.iteritems():
340 if issubclass(cls
, config
._options
[key
]._cls
):
341 kwargs
.setdefault(key
, value
)
343 raise datastore_errors
.BadArgumentError(
344 'config argument should be Configuration (%r)' % (config
,))
346 obj
= super(BaseConfiguration
, cls
).__new
__(cls
)
348 for key
, value
in kwargs
.iteritems():
349 if value
is not None:
351 config_option
= obj
._options
[key
]
352 except KeyError, err
:
353 raise TypeError('Unknown configuration option (%s)' % err
)
354 value
= config_option
.validator(value
)
355 if value
is not None:
356 obj
._values
[key
] = value
359 def __eq__(self
, other
):
362 if not isinstance(other
, BaseConfiguration
):
363 return NotImplemented
364 return self
._options
== other
._options
and self
._values
== other
._values
366 def __ne__(self
, other
):
367 equal
= self
.__eq
__(other
)
368 if equal
is NotImplemented:
373 return (hash(frozenset(self
._values
.iteritems())) ^
374 hash(frozenset(self
._options
.iteritems())))
378 for key_value
in sorted(self
._values
.iteritems()):
379 args
.append('%s=%r' % key_value
)
380 return '%s(%s)' % (self
.__class
__.__name
__, ', '.join(args
))
382 def __is_stronger(self
, **kwargs
):
383 """Internal helper to ask whether a configuration is stronger than another.
385 A configuration is stronger when it contains every name/value pair in
388 Example: a configuration with:
389 (deadline=5, on_configuration=None, read_policy=EVENTUAL_CONSISTENCY)
391 (deadline=5, on_configuration=None)
392 but not stronger than:
393 (deadline=5, on_configuration=None, read_policy=None)
395 (deadline=10, on_configuration=None, read_policy=None).
398 - Any value is stronger than an unset value;
399 - Any value is stronger than itself.
402 True if each of the self attributes is stronger than the
403 corresponding argument.
405 for key
, value
in kwargs
.iteritems():
406 if key
not in self
._values
or value
!= self
._values
[key
]:
411 def is_configuration(cls
, obj
):
412 """True if configuration obj handles all options of this class.
414 Use this method rather than isinstance(obj, cls) to test if a
415 configuration object handles the options of cls (is_configuration
416 is handled specially for results of merge which may handle the options
417 of unrelated configuration classes).
420 obj: the object to test.
422 return isinstance(obj
, BaseConfiguration
) and obj
._is
_configuration
(cls
)
424 def _is_configuration(self
, cls
):
425 return isinstance(self
, cls
)
427 def merge(self
, config
):
428 """Merge two configurations.
430 The configuration given as an argument (if any) takes priority;
431 defaults are filled in from the current configuration.
434 config: Configuration providing overrides, or None (but cannot
438 Either a new configuration object or (if it would be equivalent)
439 self or the config argument unchanged, but never None.
442 BadArgumentError if self or config are of configurations classes
443 with conflicting options (i.e. the same option name defined in
444 two different configuration classes).
446 if config
is None or config
is self
:
452 if not (isinstance(config
, _MergedConfiguration
) or
453 isinstance(self
, _MergedConfiguration
)):
457 if isinstance(config
, self
.__class
__):
458 for key
in self
._values
:
459 if key
not in config
._values
:
463 if isinstance(self
, config
.__class
__):
464 if self
.__is
_stronger
(**config
._values
):
468 def _quick_merge(obj
):
469 obj
._values
= self
._values
.copy()
470 obj
._values
.update(config
._values
)
473 if isinstance(config
, self
.__class
__):
474 return _quick_merge(type(config
)())
475 if isinstance(self
, config
.__class
__):
476 return _quick_merge(type(self
)())
479 return _MergedConfiguration(config
, self
)
481 def __getstate__(self
):
482 return {'_values': self
._values
}
484 def __setstate__(self
, state
):
487 obj
= self
.__class
__(**state
['_values'])
488 self
._values
= obj
._values
491 class _MergedConfiguration(BaseConfiguration
):
492 """Helper class to handle merges of configurations.
494 Instances of _MergedConfiguration are in some sense "subclasses" of the
495 argument configurations, i.e.:
496 - they handle exactly the configuration options of the argument configurations
497 - the value of these options is taken in priority order from the arguments
498 - isinstance is true on this configuration if it is true on any of the
499 argument configurations
500 This class raises an exception if two argument configurations have an option
501 with the same name but coming from a different configuration class.
503 __slots__
= ['_values', '_configs', '_options', '_classes']
505 def __new__(cls
, *configs
):
506 obj
= super(BaseConfiguration
, cls
).__new
__(cls
)
507 obj
._configs
= configs
511 for config
in configs
:
512 for name
, option
in config
._options
.iteritems():
513 if name
in obj
._options
:
514 if option
is not obj
._options
[name
]:
515 error
= ("merge conflict on '%s' from '%s' and '%s'" %
516 (name
, option
._cls
.__name
__,
517 obj
._options
[name
]._cls
.__name
__))
518 raise datastore_errors
.BadArgumentError(error
)
519 obj
._options
[name
] = option
522 for config
in reversed(configs
):
523 for name
, value
in config
._values
.iteritems():
524 obj
._values
[name
] = value
529 return '%s%r' % (self
.__class
__.__name
__, tuple(self
._configs
))
531 def _is_configuration(self
, cls
):
532 for config
in self
._configs
:
533 if config
._is
_configuration
(cls
):
537 def __getattr__(self
, name
):
538 if name
in self
._options
:
539 if name
in self
._values
:
540 return self
._values
[name
]
543 raise AttributeError("Configuration has no attribute '%s'" % (name
,))
545 def __getstate__(self
):
546 return {'_configs': self
._configs
}
548 def __setstate__(self
, state
):
550 obj
= _MergedConfiguration(*state
['_configs'])
551 self
._values
= obj
._values
552 self
._configs
= obj
._configs
553 self
._options
= obj
._options
556 class Configuration(BaseConfiguration
):
557 """Configuration parameters for datastore RPCs.
559 This class reserves the right to define configuration options of any name
560 except those that start with 'user_'. External subclasses should only define
561 function or variables with names that start with in 'user_'.
563 The options defined on this class include generic RPC parameters (deadline)
564 but also datastore-specific parameters (on_completion and read_policy).
566 Options are set by passing keyword arguments to the constructor corresponding
567 to the configuration options defined below.
571 STRONG_CONSISTENCY
= 0
572 """A read consistency that will return up to date results."""
574 EVENTUAL_CONSISTENCY
= 1
575 """A read consistency that allows requests to return possibly stale results.
577 This read_policy tends to be faster and less prone to unavailability/timeouts.
578 May return transactionally inconsistent results in rare cases.
581 APPLY_ALL_JOBS_CONSISTENCY
= 2
582 """A read consistency that aggressively tries to find write jobs to apply.
584 Use of this read policy is strongly discouraged.
586 This read_policy tends to be more costly and is only useful in a few specific
587 cases. It is equivalent to splitting a request by entity group and wrapping
588 each batch in a separate transaction. Cannot be used with non-ancestor
593 ALL_READ_POLICIES
= frozenset((STRONG_CONSISTENCY
,
594 EVENTUAL_CONSISTENCY
,
595 APPLY_ALL_JOBS_CONSISTENCY
,
602 """The deadline for any RPC issued.
604 If unset the system default will be used which is typically 5 seconds.
607 BadArgumentError if value is not a number or is less than zero.
609 if not isinstance(value
, (int, long, float)):
610 raise datastore_errors
.BadArgumentError(
611 'deadline argument should be int/long/float (%r)' % (value
,))
613 raise datastore_errors
.BadArgumentError(
614 'deadline argument should be > 0 (%r)' % (value
,))
618 def on_completion(value
):
619 """A callback that is invoked when any RPC completes.
621 If specified, it will be called with a UserRPC object as argument when an
624 NOTE: There is a subtle but important difference between
625 UserRPC.callback and Configuration.on_completion: on_completion is
626 called with the RPC object as its first argument, where callback is
627 called without arguments. (Because a Configuration's on_completion
628 function can be used with many UserRPC objects, it would be awkward
629 if it was called without passing the specific RPC.)
636 def read_policy(value
):
637 """The read policy to use for any relevent RPC.
639 if unset STRONG_CONSISTENCY will be used.
642 BadArgumentError if value is not a known read policy.
644 if value
not in Configuration
.ALL_READ_POLICIES
:
645 raise datastore_errors
.BadArgumentError(
646 'read_policy argument invalid (%r)' % (value
,))
650 def force_writes(value
):
651 """If a write request should succeed even if the app is read-only.
653 This only applies to user controlled read-only periods.
655 if not isinstance(value
, bool):
656 raise datastore_errors
.BadArgumentError(
657 'force_writes argument invalid (%r)' % (value
,))
661 def max_entity_groups_per_rpc(value
):
662 """The maximum number of entity groups that can be represented in one rpc.
664 For a non-transactional operation that involves more entity groups than the
665 maximum, the operation will be performed by executing multiple, asynchronous
666 rpcs to the datastore, each of which has no more entity groups represented
667 than the maximum. So, if a put() operation has 8 entity groups and the
668 maximum is 3, we will send 3 rpcs, 2 with 3 entity groups and 1 with 2
669 entity groups. This is a performance optimization - in many cases
670 multiple, small, concurrent rpcs will finish faster than a single large
671 rpc. The optimal value for this property will be application-specific, so
672 experimentation is encouraged.
674 if not (isinstance(value
, (int, long)) and value
> 0):
675 raise datastore_errors
.BadArgumentError(
676 'max_entity_groups_per_rpc should be a positive integer')
680 def max_allocate_ids_keys(value
):
681 """The maximum number of keys in a v4 AllocateIds rpc."""
682 if not (isinstance(value
, (int, long)) and value
> 0):
683 raise datastore_errors
.BadArgumentError(
684 'max_allocate_ids_keys should be a positive integer')
688 def max_rpc_bytes(value
):
689 """The maximum serialized size of a Get/Put/Delete without batching."""
690 if not (isinstance(value
, (int, long)) and value
> 0):
691 raise datastore_errors
.BadArgumentError(
692 'max_rpc_bytes should be a positive integer')
696 def max_get_keys(value
):
697 """The maximum number of keys in a Get without batching."""
698 if not (isinstance(value
, (int, long)) and value
> 0):
699 raise datastore_errors
.BadArgumentError(
700 'max_get_keys should be a positive integer')
704 def max_put_entities(value
):
705 """The maximum number of entities in a Put without batching."""
706 if not (isinstance(value
, (int, long)) and value
> 0):
707 raise datastore_errors
.BadArgumentError(
708 'max_put_entities should be a positive integer')
712 def max_delete_keys(value
):
713 """The maximum number of keys in a Delete without batching."""
714 if not (isinstance(value
, (int, long)) and value
> 0):
715 raise datastore_errors
.BadArgumentError(
716 'max_delete_keys should be a positive integer')
719 class MultiRpc(object):
720 """A wrapper around multiple UserRPC objects.
722 This provides an API similar to that of UserRPC, but wraps multiple
723 RPCs such that e.g. .wait() blocks until all wrapped RPCs are
724 complete, and .get_result() returns the combined results from all
728 flatten(rpcs): Expand a list of UserRPCs and MultiRpcs
729 into a list of UserRPCs.
730 wait_any(rpcs): Call UserRPC.wait_any(flatten(rpcs)).
731 wait_all(rpcs): Call UserRPC.wait_all(flatten(rpcs)).
734 wait(): Wait for all RPCs.
735 check_success(): Wait and then check success for all RPCs.
736 get_result(): Wait for all, check successes, then merge
740 rpcs: The list of wrapped RPCs (returns a copy).
741 state: The combined state of all RPCs.
744 def __init__(self
, rpcs
, extra_hook
=None):
748 rpcs: A list of UserRPC and MultiRpc objects; it is flattened
750 extra_hook: Optional function to be applied to the final result
753 self
.__rpcs
= self
.flatten(rpcs
)
754 self
.__extra
_hook
= extra_hook
758 """Get a flattened list containing the RPCs wrapped.
760 This returns a copy to prevent users from modifying the state.
762 return list(self
.__rpcs
)
766 """Get the combined state of the wrapped RPCs.
768 This mimics the UserRPC.state property. If all wrapped RPCs have
769 the same state, that state is returned; otherwise, RUNNING is
770 returned (which here really means 'neither fish nor flesh').
772 lo
= apiproxy_rpc
.RPC
.FINISHING
773 hi
= apiproxy_rpc
.RPC
.IDLE
774 for rpc
in self
.__rpcs
:
775 lo
= min(lo
, rpc
.state
)
776 hi
= max(hi
, rpc
.state
)
779 return apiproxy_rpc
.RPC
.RUNNING
782 """Wait for all wrapped RPCs to finish.
784 This mimics the UserRPC.wait() method.
786 apiproxy_stub_map
.UserRPC
.wait_all(self
.__rpcs
)
788 def check_success(self
):
789 """Check success of all wrapped RPCs, failing if any of the failed.
791 This mimics the UserRPC.check_success() method.
793 NOTE: This first waits for all wrapped RPCs to finish before
794 checking the success of any of them. This makes debugging easier.
797 for rpc
in self
.__rpcs
:
800 def get_result(self
):
801 """Return the combined results of all wrapped RPCs.
803 This mimics the UserRPC.get_results() method. Multiple results
804 are combined using the following rules:
806 1. If there are no wrapped RPCs, an empty list is returned.
808 2. If exactly one RPC is wrapped, its result is returned.
810 3. If more than one RPC is wrapped, the result is always a list,
811 which is constructed from the wrapped results as follows:
813 a. A wrapped result equal to None is ignored;
815 b. A wrapped result that is a list (but not any other type of
816 sequence!) has its elements added to the result list.
818 c. Any other wrapped result is appended to the result list.
820 After all results are combined, if __extra_hook is set, it is
821 called with the combined results and its return value becomes the
824 NOTE: This first waits for all wrapped RPCs to finish, and then
825 checks all their success. This makes debugging easier.
837 if len(self
.__rpcs
) == 1:
838 results
= self
.__rpcs
[0].get_result()
843 for rpc
in self
.__rpcs
:
844 result
= rpc
.get_result()
845 if isinstance(result
, list):
846 results
.extend(result
)
847 elif result
is not None:
848 results
.append(result
)
849 if self
.__extra
_hook
is not None:
850 results
= self
.__extra
_hook
(results
)
854 def flatten(cls
, rpcs
):
855 """Return a list of UserRPCs, expanding MultiRpcs in the argument list.
857 For example: given 4 UserRPCs rpc1 through rpc4,
858 flatten(rpc1, MultiRpc([rpc2, rpc3], rpc4)
859 returns [rpc1, rpc2, rpc3, rpc4].
862 rpcs: A list of UserRPC and MultiRpc objects.
865 A list of UserRPC objects.
869 if isinstance(rpc
, MultiRpc
):
873 flat
.extend(rpc
.__rpcs
)
875 if not isinstance(rpc
, apiproxy_stub_map
.UserRPC
):
876 raise datastore_errors
.BadArgumentError(
877 'Expected a list of UserRPC object (%r)' % (rpc
,))
882 def wait_any(cls
, rpcs
):
883 """Wait until one of the RPCs passed in is finished.
885 This mimics UserRPC.wait_any().
888 rpcs: A list of UserRPC and MultiRpc objects.
891 A UserRPC object or None.
893 return apiproxy_stub_map
.UserRPC
.wait_any(cls
.flatten(rpcs
))
896 def wait_all(cls
, rpcs
):
897 """Wait until all RPCs passed in are finished.
899 This mimics UserRPC.wait_all().
902 rpcs: A list of UserRPC and MultiRpc objects.
904 apiproxy_stub_map
.UserRPC
.wait_all(cls
.flatten(rpcs
))
907 class BaseConnection(object):
908 """Datastore connection base class.
910 NOTE: Do not instantiate this class; use Connection or
911 TransactionalConnection instead.
913 This is not a traditional database connection -- with App Engine, in
914 the end the connection is always implicit in the process state.
915 There is also no intent to be compatible with PEP 249 (Python's
916 Database-API). But it is a useful abstraction to have an explicit
917 object that manages the database interaction, and especially
918 transactions. Other settings related to the App Engine datastore
919 are also stored here (e.g. the RPC timeout).
921 A similar class in the Java API to the App Engine datastore is
922 DatastoreServiceConfig (but in Java, transaction state is always
923 held by the current thread).
925 To use transactions, call connection.new_transaction(). This
926 returns a new connection (an instance of the TransactionalConnection
927 subclass) which you should use for all operations in the
930 This model supports multiple unrelated concurrent transactions (but
931 not nested transactions as this concept is commonly understood in
932 the relational database world).
934 When the transaction is done, call .commit() or .rollback() on the
935 transactional connection. If .commit() returns False, the
936 transaction failed and none of your operations made it to the
937 datastore; if it returns True, all your operations were committed.
938 The transactional connection cannot be used once .commit() or
939 .rollback() is called.
941 Transactions are created lazily. The first operation that requires
942 a transaction handle will issue the low-level BeginTransaction
943 request and wait for it to return.
945 Transactions keep track of the entity group. All operations within
946 a transaction must use the same entity group. An entity group
947 (currently) comprises an app id, a namespace, and a top-level key (a
948 kind and an id or name). The first operation performed determines
949 the entity group. There is some special-casing when the first
950 operation is a put() of an entity with an incomplete key; in this case
951 the entity group is determined after the operation returns.
953 NOTE: the datastore stubs in the dev_appserver currently support
954 only a single concurrent transaction. Specifically, the (old) file
955 stub locks up if an attempt is made to start a new transaction while
956 a transaction is already in use, whereas the sqlite stub fails an
960 UNKNOWN_DATASTORE
= 0
961 MASTER_SLAVE_DATASTORE
= 1
962 HIGH_REPLICATION_DATASTORE
= 2
965 def __init__(self
, adapter
=None, config
=None):
968 All arguments should be specified as keyword arguments.
971 adapter: Optional AbstractAdapter subclass instance;
972 default IdentityAdapter.
973 config: Optional Configuration object.
976 adapter
= IdentityAdapter()
977 if not isinstance(adapter
, AbstractAdapter
):
978 raise datastore_errors
.BadArgumentError(
979 'invalid adapter argument (%r)' % (adapter
,))
980 self
.__adapter
= adapter
983 config
= Configuration()
984 elif not Configuration
.is_configuration(config
):
985 raise datastore_errors
.BadArgumentError(
986 'invalid config argument (%r)' % (config
,))
987 self
.__config
= config
989 self
.__pending
_rpcs
= set()
995 """The adapter used by this connection."""
996 return self
.__adapter
1000 """The default configuration used by this connection."""
1001 return self
.__config
1006 def _add_pending(self
, rpc
):
1007 """Add an RPC object to the list of pending RPCs.
1009 The argument must be a UserRPC object, not a MultiRpc object.
1011 assert not isinstance(rpc
, MultiRpc
)
1012 self
.__pending
_rpcs
.add(rpc
)
1014 def _remove_pending(self
, rpc
):
1015 """Remove an RPC object from the list of pending RPCs.
1017 If the argument is a MultiRpc object, the wrapped RPCs are removed
1018 from the list of pending RPCs.
1020 if isinstance(rpc
, MultiRpc
):
1023 for wrapped_rpc
in rpc
._MultiRpc
__rpcs
:
1024 self
._remove
_pending
(wrapped_rpc
)
1027 self
.__pending
_rpcs
.remove(rpc
)
1033 def is_pending(self
, rpc
):
1034 """Check whether an RPC object is currently pending.
1036 Note that 'pending' in this context refers to an RPC associated
1037 with this connection for which _remove_pending() hasn't been
1038 called yet; normally this is called by check_rpc_success() which
1039 itself is called by the various result hooks. A pending RPC may
1040 be in the RUNNING or FINISHING state.
1042 If the argument is a MultiRpc object, this returns true if at least
1043 one of its wrapped RPCs is pending.
1045 if isinstance(rpc
, MultiRpc
):
1046 for wrapped_rpc
in rpc
._MultiRpc
__rpcs
:
1047 if self
.is_pending(wrapped_rpc
):
1051 return rpc
in self
.__pending
_rpcs
1053 def get_pending_rpcs(self
):
1054 """Return (a copy of) the list of currently pending RPCs."""
1055 return set(self
.__pending
_rpcs
)
1057 def get_datastore_type(self
, app
=None):
1058 """Tries to get the datastore type for the given app.
1060 This function is only guaranteed to return something other than
1061 UNKNOWN_DATASTORE when running in production and querying the current app.
1063 return _GetDatastoreType(app
)
1065 def wait_for_all_pending_rpcs(self
):
1066 """Wait for all currently pending RPCs to complete."""
1067 while self
.__pending
_rpcs
:
1069 rpc
= apiproxy_stub_map
.UserRPC
.wait_any(self
.__pending
_rpcs
)
1075 logging
.info('wait_for_all_pending_rpcs(): exception in wait_any()',
1079 logging
.debug('wait_any() returned None')
1081 assert rpc
.state
== apiproxy_rpc
.RPC
.FINISHING
1082 if rpc
in self
.__pending
_rpcs
:
1090 self
.check_rpc_success(rpc
)
1093 logging
.info('wait_for_all_pending_rpcs(): '
1094 'exception in check_rpc_success()',
1100 def create_rpc(self
, config
=None, service_name
='datastore_v3'):
1101 """Create an RPC object using the configuration parameters.
1104 config: Optional Configuration object.
1105 service_name: Optional datastore service name.
1108 A new UserRPC object with the designated settings.
1112 (1) The RPC object returned can only be used to make a single call
1113 (for details see apiproxy_stub_map.UserRPC).
1115 (2) To make a call, use one of the specific methods on the
1116 Connection object, such as conn.put(entities). This sends the
1117 call to the server but does not wait. To wait for the call to
1118 finish and get the result, call rpc.get_result().
1120 deadline
= Configuration
.deadline(config
, self
.__config
)
1121 on_completion
= Configuration
.on_completion(config
, self
.__config
)
1123 if on_completion
is not None:
1127 return on_completion(rpc
)
1128 rpc
= apiproxy_stub_map
.UserRPC(service_name
, deadline
, callback
)
1131 def _set_request_read_policy(self
, request
, config
=None):
1132 """Set the read policy on a request.
1134 This takes the read policy from the config argument or the
1135 configuration's default configuration, and if it is
1136 EVENTUAL_CONSISTENCY, sets the failover_ms field in the protobuf.
1139 request: A protobuf with a failover_ms field.
1140 config: Optional Configuration object.
1142 if not (hasattr(request
, 'set_failover_ms') and hasattr(request
, 'strong')):
1143 raise datastore_errors
.BadRequestError(
1144 'read_policy is only supported on read operations.')
1146 if isinstance(config
, apiproxy_stub_map
.UserRPC
):
1147 read_policy
= getattr(config
, 'read_policy', None)
1149 read_policy
= Configuration
.read_policy(config
)
1152 if read_policy
is None:
1153 read_policy
= self
.__config
.read_policy
1155 if read_policy
== Configuration
.APPLY_ALL_JOBS_CONSISTENCY
:
1156 request
.set_strong(True)
1157 elif read_policy
== Configuration
.EVENTUAL_CONSISTENCY
:
1158 request
.set_strong(False)
1162 request
.set_failover_ms(-1)
1164 def _set_request_transaction(self
, request
):
1165 """Set the current transaction on a request.
1167 NOTE: This version of the method does nothing. The version
1168 overridden by TransactionalConnection is the real thing.
1171 request: A protobuf with a transaction field.
1174 A datastore_pb.Transaction object or None.
1178 def make_rpc_call(self
, config
, method
, request
, response
,
1179 get_result_hook
=None, user_data
=None,
1180 service_name
='datastore_v3'):
1181 """Make an RPC call.
1183 Except for the added config argument, this is a thin wrapper
1184 around UserRPC.make_call().
1187 config: A Configuration object or None. Defaults are taken from
1188 the connection's default configuration.
1189 method: The method name.
1190 request: The request protocol buffer.
1191 response: The response protocol buffer.
1192 get_result_hook: Optional get-result hook function. If not None,
1193 this must be a function with exactly one argument, the RPC
1194 object (self). Its return value is returned from get_result().
1195 user_data: Optional additional arbitrary data for the get-result
1196 hook function. This can be accessed as rpc.user_data. The
1197 type of this value is up to the service module.
1200 The UserRPC object used for the call.
1204 if isinstance(config
, apiproxy_stub_map
.UserRPC
):
1207 rpc
= self
.create_rpc(config
, service_name
)
1208 rpc
.make_call(method
, request
, response
, get_result_hook
, user_data
)
1209 self
._add
_pending
(rpc
)
1212 def check_rpc_success(self
, rpc
):
1213 """Check for RPC success and translate exceptions.
1215 This wraps rpc.check_success() and should be called instead of that.
1217 This also removes the RPC from the list of pending RPCs, once it
1221 rpc: A UserRPC or MultiRpc object.
1224 Nothing if the call succeeded; various datastore_errors.Error
1225 subclasses if ApplicationError was raised by rpc.check_success().
1232 self
._remove
_pending
(rpc
)
1235 except apiproxy_errors
.ApplicationError
, err
:
1236 raise _ToDatastoreError(err
)
1242 MAX_RPC_BYTES
= 1024 * 1024
1244 MAX_PUT_ENTITIES
= 500
1245 MAX_DELETE_KEYS
= 500
1246 MAX_ALLOCATE_IDS_KEYS
= 500
1249 DEFAULT_MAX_ENTITY_GROUPS_PER_RPC
= 10
1254 def __get_max_entity_groups_per_rpc(self
, config
):
1255 """Internal helper: figures out max_entity_groups_per_rpc for the config."""
1256 return Configuration
.max_entity_groups_per_rpc(
1257 config
, self
.__config
) or self
.DEFAULT_MAX_ENTITY_GROUPS_PER_RPC
1259 def _extract_entity_group(self
, value
):
1260 """Internal helper: extracts the entity group from a key or entity."""
1261 if isinstance(value
, entity_pb
.EntityProto
):
1263 return value
.path().element(0)
1265 def _map_and_group(self
, values
, map_fn
, group_fn
):
1266 """Internal helper: map values to keys and group by key. Here key is any
1267 object derived from an input value by map_fn, and which can be grouped
1271 values: The values to be grouped by applying get_group(to_ref(value)).
1272 map_fn: a function that maps a value to a key to be grouped.
1273 group_fn: a function that groups the keys output by map_fn.
1276 A list where each element is a list of (key, index) pairs. Here
1277 index is the location of the value from which the key was derived in
1280 indexed_key_groups
= collections
.defaultdict(list)
1281 for index
, value
in enumerate(values
):
1283 indexed_key_groups
[group_fn(key
)].append((key
, index
))
1284 return indexed_key_groups
.values()
1286 def __group_indexed_pbs_by_entity_group(self
, values
, to_ref
):
1287 """Internal helper: group pbs by entity group.
1290 values: The values to be grouped by entity group.
1291 to_ref: A function that translates a value to a Reference pb.
1294 A list where each element is a list of (pb, index) pairs. Here index is
1295 the location of the value from which pb was derived in the original list.
1297 def get_entity_group(ref
):
1298 eg
= self
._extract
_entity
_group
(ref
)
1301 return (eg
.type(), eg
.id() or eg
.name() or ('new', id(eg
)))
1303 return self
._map
_and
_group
(values
, to_ref
, get_entity_group
)
1305 def __create_result_index_pairs(self
, indexes
):
1306 """Internal helper: build a function that ties an index with each result.
1309 indexes: A list of integers. A value x at location y in the list means
1310 that the result at location y in the result list needs to be at location
1311 x in the list of results returned to the user.
1314 def create_result_index_pairs(results
):
1315 return zip(results
, indexes
)
1316 return create_result_index_pairs
1318 def __sort_result_index_pairs(self
, extra_hook
):
1319 """Builds a function that sorts the indexed results.
1322 extra_hook: A function that the returned function will apply to its result
1326 A function that takes a list of results and reorders them to match the
1327 order in which the input values associated with each results were
1328 originally provided.
1331 def sort_result_index_pairs(result_index_pairs
):
1332 results
= [None] * len(result_index_pairs
)
1333 for result
, index
in result_index_pairs
:
1334 results
[index
] = result
1335 if extra_hook
is not None:
1336 results
= extra_hook(results
)
1338 return sort_result_index_pairs
1340 def _generate_pb_lists(self
, grouped_values
, base_size
, max_count
,
1341 max_groups
, config
):
1342 """Internal helper: repeatedly yield a list of 2 elements.
1345 grouped_values: A list of lists. The inner lists consist of objects
1346 grouped by e.g. entity group or id sequence.
1348 base_size: An integer representing the base size of an rpc. Used for
1349 splitting operations across multiple RPCs due to size limitations.
1351 max_count: An integer representing the maximum number of objects we can
1352 send in an rpc. Used for splitting operations across multiple RPCs.
1354 max_groups: An integer representing the maximum number of groups we can
1355 have represented in an rpc. Can be None, in which case no constraint.
1357 config: The config object, defining max rpc size in bytes.
1360 Repeatedly yields 2 element tuples. The first element is a list of
1361 protobufs to send in one batch. The second element is a list containing
1362 the original location of those protobufs (expressed as an index) in the
1365 max_size
= (Configuration
.max_rpc_bytes(config
, self
.__config
) or
1371 for indexed_pbs
in grouped_values
:
1373 if max_groups
is not None and num_groups
> max_groups
:
1374 yield (pbs
, pb_indexes
)
1379 for indexed_pb
in indexed_pbs
:
1380 (pb
, index
) = indexed_pb
1382 incr_size
= pb
.lengthString(pb
.ByteSize()) + 1
1387 if (not isinstance(config
, apiproxy_stub_map
.UserRPC
) and
1388 (len(pbs
) >= max_count
or (pbs
and size
+ incr_size
> max_size
))):
1389 yield (pbs
, pb_indexes
)
1395 pb_indexes
.append(index
)
1397 yield (pbs
, pb_indexes
)
1399 def _get_base_size(self
, base_req
):
1400 """Internal helper: return request size in bytes."""
1401 return base_req
.ByteSize()
1403 def get(self
, keys
):
1404 """Synchronous Get operation.
1407 keys: An iterable of user-level key objects.
1410 A list of user-level entity objects and None values, corresponding
1411 1:1 to the argument keys. A None means there is no entity for the
1414 return self
.async_get(None, keys
).get_result()
1416 def async_get(self
, config
, keys
, extra_hook
=None):
1417 """Asynchronous Get operation.
1420 config: A Configuration object or None. Defaults are taken from
1421 the connection's default configuration.
1422 keys: An iterable of user-level key objects.
1423 extra_hook: Optional function to be called on the result once the
1430 def make_get_call(req
, pbs
, user_data
=None):
1431 req
.key_list().extend(pbs
)
1432 self
._set
_request
_transaction
(req
)
1433 resp
= datastore_pb
.GetResponse()
1434 return self
.make_rpc_call(config
, 'Get', req
, resp
,
1435 self
.__get
_hook
, user_data
)
1437 base_req
= datastore_pb
.GetRequest()
1438 self
._set
_request
_read
_policy
(base_req
, config
)
1441 if isinstance(config
, apiproxy_stub_map
.UserRPC
) or len(keys
) <= 1:
1442 pbs
= [self
.__adapter
.key_to_pb(key
) for key
in keys
]
1443 return make_get_call(base_req
, pbs
, extra_hook
)
1445 base_size
= self
._get
_base
_size
(base_req
)
1446 max_count
= (Configuration
.max_get_keys(config
, self
.__config
) or
1449 if base_req
.has_strong():
1450 is_read_current
= base_req
.strong()
1452 is_read_current
= (self
.get_datastore_type() ==
1453 BaseConnection
.HIGH_REPLICATION_DATASTORE
)
1455 indexed_keys_by_entity_group
= self
.__group
_indexed
_pbs
_by
_entity
_group
(
1456 keys
, self
.__adapter
.key_to_pb
)
1461 if is_read_current
and not base_req
.has_transaction():
1462 max_egs_per_rpc
= self
.__get
_max
_entity
_groups
_per
_rpc
(config
)
1464 max_egs_per_rpc
= None
1468 pbsgen
= self
._generate
_pb
_lists
(
1469 indexed_keys_by_entity_group
, base_size
, max_count
, max_egs_per_rpc
,
1473 for pbs
, indexes
in pbsgen
:
1474 req
= datastore_pb
.GetRequest()
1475 req
.CopyFrom(base_req
)
1476 rpcs
.append(make_get_call(req
, pbs
,
1477 self
.__create
_result
_index
_pairs
(indexes
)))
1478 return MultiRpc(rpcs
, self
.__sort
_result
_index
_pairs
(extra_hook
))
1480 def __get_hook(self
, rpc
):
1481 """Internal method used as get_result_hook for Get operation."""
1482 self
.check_rpc_success(rpc
)
1484 for group
in rpc
.response
.entity_list():
1485 if group
.has_entity():
1486 entity
= self
.__adapter
.pb_to_entity(group
.entity())
1489 entities
.append(entity
)
1490 if rpc
.user_data
is not None:
1491 entities
= rpc
.user_data(entities
)
1494 def get_indexes(self
):
1495 """Synchronous get indexes operation.
1498 user-level indexes representation
1500 return self
.async_get_indexes(None).get_result()
1502 def async_get_indexes(self
, config
, extra_hook
=None, _app
=None):
1503 """Asynchronous get indexes operation.
1506 config: A Configuration object or None. Defaults are taken from
1507 the connection's default configuration.
1508 extra_hook: Optional function to be called once the RPC has completed.
1513 req
= api_base_pb
.StringProto()
1514 req
.set_value(datastore_types
.ResolveAppId(_app
))
1515 resp
= datastore_pb
.CompositeIndices()
1516 return self
.make_rpc_call(config
, 'GetIndices', req
, resp
,
1517 self
.__get
_indexes
_hook
, extra_hook
)
1519 def __get_indexes_hook(self
, rpc
):
1520 """Internal method used as get_result_hook for Get operation."""
1521 self
.check_rpc_success(rpc
)
1522 indexes
= [self
.__adapter
.pb_to_index(index
)
1523 for index
in rpc
.response
.index_list()]
1525 indexes
= rpc
.user_data(indexes
)
1528 def put(self
, entities
):
1529 """Synchronous Put operation.
1532 entities: An iterable of user-level entity objects.
1535 A list of user-level key objects, corresponding 1:1 to the
1538 NOTE: If any of the entities has an incomplete key, this will
1539 *not* patch up those entities with the complete key.
1541 return self
.async_put(None, entities
).get_result()
1543 def async_put(self
, config
, entities
, extra_hook
=None):
1544 """Asynchronous Put operation.
1547 config: A Configuration object or None. Defaults are taken from
1548 the connection's default configuration.
1549 entities: An iterable of user-level entity objects.
1550 extra_hook: Optional function to be called on the result once the
1556 NOTE: If any of the entities has an incomplete key, this will
1557 *not* patch up those entities with the complete key.
1560 def make_put_call(req
, pbs
, user_data
=None):
1561 req
.entity_list().extend(pbs
)
1562 self
._set
_request
_transaction
(req
)
1563 resp
= datastore_pb
.PutResponse()
1564 return self
.make_rpc_call(config
, 'Put', req
, resp
,
1565 self
.__put
_hook
, user_data
)
1568 base_req
= datastore_pb
.PutRequest()
1569 if Configuration
.force_writes(config
, self
.__config
):
1570 base_req
.set_force(True)
1573 if isinstance(config
, apiproxy_stub_map
.UserRPC
) or len(entities
) <= 1:
1574 pbs
= [self
.__adapter
.entity_to_pb(entity
) for entity
in entities
]
1575 return make_put_call(base_req
, pbs
, extra_hook
)
1577 base_size
= self
._get
_base
_size
(base_req
)
1578 max_count
= (Configuration
.max_put_entities(config
, self
.__config
) or
1579 self
.MAX_PUT_ENTITIES
)
1581 indexed_entities_by_entity_group
= self
.__group
_indexed
_pbs
_by
_entity
_group
(
1582 entities
, self
.__adapter
.entity_to_pb
)
1583 if not base_req
.has_transaction():
1584 max_egs_per_rpc
= self
.__get
_max
_entity
_groups
_per
_rpc
(config
)
1586 max_egs_per_rpc
= None
1588 pbsgen
= self
._generate
_pb
_lists
(
1589 indexed_entities_by_entity_group
, base_size
, max_count
, max_egs_per_rpc
,
1592 for pbs
, indexes
in pbsgen
:
1593 req
= datastore_pb
.PutRequest()
1594 req
.CopyFrom(base_req
)
1595 rpcs
.append(make_put_call(req
, pbs
,
1596 self
.__create
_result
_index
_pairs
(indexes
)))
1597 return MultiRpc(rpcs
, self
.__sort
_result
_index
_pairs
(extra_hook
))
1599 def __put_hook(self
, rpc
):
1600 """Internal method used as get_result_hook for Put operation."""
1601 self
.check_rpc_success(rpc
)
1602 keys
= [self
.__adapter
.pb_to_key(pb
)
1603 for pb
in rpc
.response
.key_list()]
1606 if rpc
.user_data
is not None:
1607 keys
= rpc
.user_data(keys
)
1610 def delete(self
, keys
):
1611 """Synchronous Delete operation.
1614 keys: An iterable of user-level key objects.
1619 return self
.async_delete(None, keys
).get_result()
1621 def async_delete(self
, config
, keys
, extra_hook
=None):
1622 """Asynchronous Delete operation.
1625 config: A Configuration object or None. Defaults are taken from
1626 the connection's default configuration.
1627 keys: An iterable of user-level key objects.
1628 extra_hook: Optional function to be called once the RPC has completed.
1634 def make_delete_call(req
, pbs
, user_data
=None):
1635 req
.key_list().extend(pbs
)
1636 self
._set
_request
_transaction
(req
)
1637 resp
= datastore_pb
.DeleteResponse()
1638 return self
.make_rpc_call(config
, 'Delete', req
, resp
,
1639 self
.__delete
_hook
, user_data
)
1642 base_req
= datastore_pb
.DeleteRequest()
1643 if Configuration
.force_writes(config
, self
.__config
):
1644 base_req
.set_force(True)
1647 if isinstance(config
, apiproxy_stub_map
.UserRPC
) or len(keys
) <= 1:
1648 pbs
= [self
.__adapter
.key_to_pb(key
) for key
in keys
]
1649 return make_delete_call(base_req
, pbs
, extra_hook
)
1651 base_size
= self
._get
_base
_size
(base_req
)
1652 max_count
= (Configuration
.max_delete_keys(config
, self
.__config
) or
1653 self
.MAX_DELETE_KEYS
)
1654 if not base_req
.has_transaction():
1655 max_egs_per_rpc
= self
.__get
_max
_entity
_groups
_per
_rpc
(config
)
1657 max_egs_per_rpc
= None
1658 indexed_keys_by_entity_group
= self
.__group
_indexed
_pbs
_by
_entity
_group
(
1659 keys
, self
.__adapter
.key_to_pb
)
1663 pbsgen
= self
._generate
_pb
_lists
(
1664 indexed_keys_by_entity_group
, base_size
, max_count
, max_egs_per_rpc
,
1667 for pbs
, _
in pbsgen
:
1668 req
= datastore_pb
.DeleteRequest()
1669 req
.CopyFrom(base_req
)
1670 rpcs
.append(make_delete_call(req
, pbs
))
1671 return MultiRpc(rpcs
, extra_hook
)
1673 def __delete_hook(self
, rpc
):
1674 """Internal method used as get_result_hook for Delete operation."""
1675 self
.check_rpc_success(rpc
)
1676 if rpc
.user_data
is not None:
1682 def begin_transaction(self
, app
):
1683 """Syncnronous BeginTransaction operation.
1685 NOTE: In most cases the new_transaction() method is preferred,
1686 since that returns a TransactionalConnection object which will
1687 begin the transaction lazily.
1690 app: Application ID.
1693 A datastore_pb.Transaction object.
1695 return self
.async_begin_transaction(None, app
).get_result()
1697 def async_begin_transaction(self
, config
, app
):
1698 """Asynchronous BeginTransaction operation.
1701 config: A configuration object or None. Defaults are taken from
1702 the connection's default configuration.
1703 app: Application ID.
1708 if not isinstance(app
, basestring
) or not app
:
1709 raise datastore_errors
.BadArgumentError(
1710 'begin_transaction requires an application id argument (%r)' %
1712 req
= datastore_pb
.BeginTransactionRequest()
1714 if (TransactionOptions
.xg(config
, self
.__config
)):
1715 req
.set_allow_multiple_eg(True)
1716 resp
= datastore_pb
.Transaction()
1717 rpc
= self
.make_rpc_call(config
, 'BeginTransaction', req
, resp
,
1718 self
.__begin
_transaction
_hook
)
1721 def __begin_transaction_hook(self
, rpc
):
1722 """Internal method used as get_result_hook for BeginTransaction."""
1723 self
.check_rpc_success(rpc
)
1727 class Connection(BaseConnection
):
1728 """Transaction-less connection class.
1730 This contains those operations that are not allowed on transactional
1731 connections. (Currently only allocate_ids and reserve_key_ids.)
1735 def __init__(self
, adapter
=None, config
=None):
1738 All arguments should be specified as keyword arguments.
1741 adapter: Optional AbstractAdapter subclass instance;
1742 default IdentityAdapter.
1743 config: Optional Configuration object.
1745 super(Connection
, self
).__init
__(adapter
=adapter
, config
=config
)
1746 self
.__adapter
= self
.adapter
1747 self
.__config
= self
.config
1751 def new_transaction(self
, config
=None):
1752 """Create a new transactional connection based on this one.
1754 This is different from, and usually preferred over, the
1755 begin_transaction() method; new_transaction() returns a new
1756 TransactionalConnection object.
1759 config: A configuration object for the new connection, merged
1760 with this connection's config.
1762 config
= self
.__config
.merge(config
)
1763 return TransactionalConnection(adapter
=self
.__adapter
, config
=config
)
1767 def __to_v4_key(self
, ref
):
1768 """Convert a valid v3 Reference pb to a v4 Key pb."""
1769 key
= entity_v4_pb
.Key()
1770 key
.mutable_partition_id().set_dataset_id(ref
.app())
1771 if ref
.name_space():
1772 key
.mutable_partition_id().set_namespace(ref
.name_space())
1773 for el_v3
in ref
.path().element_list():
1774 el_v4
= key
.add_path_element()
1775 el_v4
.set_kind(el_v3
.type())
1777 el_v4
.set_id(el_v3
.id())
1778 if el_v3
.has_name():
1779 el_v4
.set_name(el_v3
.name())
1782 def __to_v3_reference(self
, key
):
1783 """Convert a valid v4 Key pb to a v3 Reference pb."""
1784 ref
= entity_pb
.Reference()
1785 ref
.set_app(key
.partition_id().dataset_id())
1786 if key
.partition_id().has_namespace():
1787 ref
.set_name_space(key
.partition_id().namespace())
1788 for el_v4
in key
.path_element_list():
1789 el_v3
= ref
.mutable_path().add_element()
1790 el_v3
.set_type(el_v4
.kind())
1792 el_v3
.set_id(el_v4
.id())
1793 if el_v4
.has_name():
1794 el_v3
.set_name(el_v4
.name())
1799 def allocate_ids(self
, key
, size
=None, max=None):
1800 """Synchronous AllocateIds operation.
1802 Exactly one of size and max must be specified.
1805 key: A user-level key object.
1806 size: Optional number of IDs to allocate.
1807 max: Optional maximum ID to allocate.
1810 A pair (start, end) giving the (inclusive) range of IDs allocation.
1812 return self
.async_allocate_ids(None, key
, size
, max).get_result()
1814 def async_allocate_ids(self
, config
, key
, size
=None, max=None,
1816 """Asynchronous AllocateIds operation.
1819 config: A Configuration object or None. Defaults are taken from
1820 the connection's default configuration.
1821 key: A user-level key object.
1822 size: Optional number of IDs to allocate.
1823 max: Optional maximum ID to allocate.
1824 extra_hook: Optional function to be called on the result once the
1830 if size
is not None:
1832 raise datastore_errors
.BadArgumentError(
1833 'Cannot allocate ids using both size and max')
1834 if not isinstance(size
, (int, long)):
1835 raise datastore_errors
.BadArgumentError('Invalid size (%r)' % (size
,))
1836 if size
> _MAX_ID_BATCH_SIZE
:
1837 raise datastore_errors
.BadArgumentError(
1838 'Cannot allocate more than %s ids at a time; received %s'
1839 % (_MAX_ID_BATCH_SIZE
, size
))
1841 raise datastore_errors
.BadArgumentError(
1842 'Cannot allocate less than 1 id; received %s' % size
)
1844 if not isinstance(max, (int, long)):
1845 raise datastore_errors
.BadArgumentError('Invalid max (%r)' % (max,))
1847 raise datastore_errors
.BadArgumentError(
1848 'Cannot allocate a range with a max less than 0 id; received %s' %
1850 req
= datastore_pb
.AllocateIdsRequest()
1851 req
.mutable_model_key().CopyFrom(self
.__adapter
.key_to_pb(key
))
1852 if size
is not None:
1856 resp
= datastore_pb
.AllocateIdsResponse()
1857 rpc
= self
.make_rpc_call(config
, 'AllocateIds', req
, resp
,
1858 self
.__allocate
_ids
_hook
, extra_hook
)
1861 def __allocate_ids_hook(self
, rpc
):
1862 """Internal method used as get_result_hook for AllocateIds."""
1863 self
.check_rpc_success(rpc
)
1864 pair
= rpc
.response
.start(), rpc
.response
.end()
1865 if rpc
.user_data
is not None:
1866 pair
= rpc
.user_data(pair
)
1871 def _reserve_keys(self
, keys
):
1872 """Synchronous AllocateIds operation to reserve the given keys.
1874 Sends one or more v4 AllocateIds rpcs with keys to reserve.
1875 Reserved keys must be complete and must have valid ids.
1878 keys: Iterable of user-level keys.
1880 self
._async
_reserve
_keys
(None, keys
).get_result()
1882 def _async_reserve_keys(self
, config
, keys
, extra_hook
=None):
1883 """Asynchronous AllocateIds operation to reserve the given keys.
1885 Sends one or more v4 AllocateIds rpcs with keys to reserve.
1886 Reserved keys must be complete and must have valid ids.
1889 config: A Configuration object or None to use Connection default.
1890 keys: Iterable of user-level keys.
1891 extra_hook: Optional function to be called on rpc result.
1894 None, or the result of user-supplied extra_hook.
1897 if key
.path().element_size() == 1:
1900 eg
= self
._extract
_entity
_group
(key
)
1901 return (eg
.type(), eg
.id() or eg
.name())
1903 keys_by_idkey
= self
._map
_and
_group
(keys
, self
.__adapter
.key_to_pb
,
1905 max_count
= (Configuration
.max_allocate_ids_keys(config
, self
.__config
) or
1906 self
.MAX_ALLOCATE_IDS_KEYS
)
1909 pbsgen
= self
._generate
_pb
_lists
(keys_by_idkey
, 0, max_count
, None, config
)
1910 for pbs
, _
in pbsgen
:
1911 req
= datastore_v4a_pb
.AllocateIdsRequest()
1912 req
.reserve_list().extend([self
.__to
_v
4_key
(key
) for key
in pbs
])
1913 resp
= datastore_v4a_pb
.AllocateIdsResponse()
1914 rpcs
.append(self
.make_rpc_call(config
, 'AllocateIds', req
, resp
,
1915 self
.__reserve
_keys
_hook
, extra_hook
,
1917 return MultiRpc(rpcs
)
1919 def __reserve_keys_hook(self
, rpc
):
1920 """Internal get_result_hook for _reserve_keys."""
1921 self
.check_rpc_success(rpc
)
1922 if rpc
.user_data
is not None:
1923 return rpc
.user_data(rpc
.response
)
1926 class TransactionOptions(Configuration
):
1927 """An immutable class that contains options for a transaction."""
1930 """Create a nested transaction under an existing one."""
1933 """Always propagate an existing transaction, throw an exception if there is
1934 no existing transaction."""
1937 """If there is an existing transaction propagate it."""
1940 """Always use a new transaction, pausing any existing transactions."""
1942 _PROPAGATION
= frozenset((NESTED
, MANDATORY
, ALLOWED
, INDEPENDENT
))
1945 def propagation(value
):
1946 """How existing transactions should be handled.
1948 One of NESTED, MANDATORY, ALLOWED, INDEPENDENT. The interpertation of
1949 these types is up to higher level run-in-transaction implementations.
1951 WARNING: Using anything other than NESTED for the propagation flag
1952 can have strange consequences. When using ALLOWED or MANDATORY, if
1953 an exception is raised, the transaction is likely not safe to
1954 commit. When using INDEPENDENT it is not generally safe to return
1955 values read to the caller (as they were not read in the caller's
1958 Raises: datastore_errors.BadArgumentError if value is not reconized.
1960 if value
not in TransactionOptions
._PROPAGATION
:
1961 raise datastore_errors
.BadArgumentError('Unknown propagation value (%r)' %
1967 """Whether to allow cross-group transactions.
1969 Raises: datastore_errors.BadArgumentError if value is not a bool.
1971 if not isinstance(value
, bool):
1972 raise datastore_errors
.BadArgumentError(
1973 'xg argument should be bool (%r)' % (value
,))
1978 """How many retries to attempt on the transaction.
1980 The exact retry logic is implemented in higher level run-in-transaction
1983 Raises: datastore_errors.BadArgumentError if value is not an integer or
1984 is not greater than zero.
1986 datastore_types
.ValidateInteger(value
,
1988 datastore_errors
.BadArgumentError
,
1994 """The application in which to perform the transaction.
1996 Raises: datastore_errors.BadArgumentError if value is not a string
1997 or is the empty string.
1999 datastore_types
.ValidateString(value
,
2001 datastore_errors
.BadArgumentError
)
2005 class TransactionalConnection(BaseConnection
):
2006 """A connection specific to one transaction.
2008 It is possible to pass the transaction and entity group to the
2009 constructor, but typically the transaction is lazily created by
2010 _get_transaction() when the first operation is started.
2015 adapter
=None, config
=None, transaction
=None, entity_group
=None):
2018 All arguments should be specified as keyword arguments.
2021 adapter: Optional AbstractAdapter subclass instance;
2022 default IdentityAdapter.
2023 config: Optional Configuration object.
2024 transaction: Optional datastore_db.Transaction object.
2025 entity_group: Deprecated, do not use.
2027 super(TransactionalConnection
, self
).__init
__(adapter
=adapter
,
2029 self
.__adapter
= self
.adapter
2030 if transaction
is None:
2031 app
= TransactionOptions
.app(self
.config
)
2032 app
= datastore_types
.ResolveAppId(TransactionOptions
.app(self
.config
))
2033 self
.__transaction
_rpc
= self
.async_begin_transaction(None, app
)
2035 if not isinstance(transaction
, datastore_pb
.Transaction
):
2036 raise datastore_errors
.BadArgumentError(
2037 'Invalid transaction (%r)' % (transaction
,))
2038 self
.__transaction
= transaction
2039 self
.__transaction
_rpc
= None
2040 self
.__finished
= False
2042 def _get_base_size(self
, base_req
):
2043 """Internal helper: return size in bytes plus room for transaction."""
2044 return (super(TransactionalConnection
, self
)._get
_base
_size
(base_req
) +
2045 self
.transaction
.lengthString(self
.transaction
.ByteSize()) + 1)
2049 return self
.__finished
2052 def transaction(self
):
2053 if self
.__transaction
_rpc
is not None:
2054 self
.__transaction
= self
.__transaction
_rpc
.get_result()
2055 self
.__transaction
_rpc
= None
2056 return self
.__transaction
2058 def _set_request_transaction(self
, request
):
2059 """Set the current transaction on a request.
2061 This calls _get_transaction() (see below). The transaction object
2062 returned is both set as the transaction field on the request
2063 object and returned.
2066 request: A protobuf with a transaction field.
2069 A datastore_pb.Transaction object or None.
2072 raise datastore_errors
.BadRequestError(
2073 'Cannot start a new operation in a finished transaction.')
2074 transaction
= self
.transaction
2075 request
.mutable_transaction().CopyFrom(transaction
)
2078 def _end_transaction(self
):
2079 """Finish the current transaction.
2081 This blocks waiting for all pending RPCs to complete, and then
2082 marks the connection as finished. After that no more operations
2083 can be started using this connection.
2086 A datastore_pb.Transaction object or None.
2089 datastore_errors.BadRequestError if the transaction is already
2093 raise datastore_errors
.BadRequestError(
2094 'The transaction is already finished.')
2097 self
.wait_for_all_pending_rpcs()
2098 assert not self
.get_pending_rpcs()
2099 transaction
= self
.transaction
2100 self
.__finished
= True
2101 self
.__transaction
= None
2107 """Synchronous Commit operation.
2110 True if the transaction was successfully committed. False if
2111 the backend reported a concurrent transaction error.
2115 rpc
= self
.create_rpc()
2116 rpc
= self
.async_commit(rpc
)
2119 return rpc
.get_result()
2121 def async_commit(self
, config
):
2122 """Asynchronous Commit operation.
2125 config: A Configuration object or None. Defaults are taken from
2126 the connection's default configuration.
2131 transaction
= self
._end
_transaction
()
2132 if transaction
is None:
2134 resp
= datastore_pb
.CommitResponse()
2135 rpc
= self
.make_rpc_call(config
, 'Commit', transaction
, resp
,
2139 def __commit_hook(self
, rpc
):
2140 """Internal method used as get_result_hook for Commit."""
2143 except apiproxy_errors
.ApplicationError
, err
:
2144 if err
.application_error
== datastore_pb
.Error
.CONCURRENT_TRANSACTION
:
2147 raise _ToDatastoreError(err
)
2154 """Synchronous Rollback operation."""
2155 rpc
= self
.async_rollback(None)
2158 return rpc
.get_result()
2160 def async_rollback(self
, config
):
2161 """Asynchronous Rollback operation.
2164 config: A Configuration object or None. Defaults are taken from
2165 the connection's default configuration.
2170 transaction
= self
._end
_transaction
()
2171 if transaction
is None:
2173 resp
= api_base_pb
.VoidProto()
2174 rpc
= self
.make_rpc_call(config
, 'Rollback', transaction
, resp
,
2175 self
.__rollback
_hook
)
2178 def __rollback_hook(self
, rpc
):
2179 """Internal method used as get_result_hook for Rollback."""
2180 self
.check_rpc_success(rpc
)
2186 def _ToDatastoreError(err
):
2187 """Converts an apiproxy.ApplicationError to an error in datastore_errors.
2190 err: An apiproxy.ApplicationError object.
2193 An instance of a subclass of datastore_errors.Error.
2195 return _DatastoreExceptionFromErrorCodeAndDetail(err
.application_error
,
2199 def _DatastoreExceptionFromErrorCodeAndDetail(error
, detail
):
2200 """Converts a datastore_pb.Error into a datastore_errors.Error.
2203 error: A member of the datastore_pb.Error enumeration.
2204 detail: A string providing extra details about the error.
2207 An instance of a subclass of datastore_errors.Error.
2210 datastore_pb
.Error
.BAD_REQUEST
: datastore_errors
.BadRequestError
,
2211 datastore_pb
.Error
.CONCURRENT_TRANSACTION
:
2212 datastore_errors
.TransactionFailedError
,
2213 datastore_pb
.Error
.INTERNAL_ERROR
: datastore_errors
.InternalError
,
2214 datastore_pb
.Error
.NEED_INDEX
: datastore_errors
.NeedIndexError
,
2215 datastore_pb
.Error
.TIMEOUT
: datastore_errors
.Timeout
,
2216 datastore_pb
.Error
.BIGTABLE_ERROR
: datastore_errors
.Timeout
,
2217 datastore_pb
.Error
.COMMITTED_BUT_STILL_APPLYING
:
2218 datastore_errors
.CommittedButStillApplying
,
2219 datastore_pb
.Error
.CAPABILITY_DISABLED
:
2220 apiproxy_errors
.CapabilityDisabledError
,
2221 }.get(error
, datastore_errors
.Error
)
2224 return exception_class()
2226 return exception_class(detail
)