6 from .google_imports
import datastore
# For taskqueue coordination
7 from .google_imports
import datastore_errors
8 from .google_imports
import memcache
9 from .google_imports
import namespace_manager
10 from .google_imports
import urlfetch
11 from .google_imports
import datastore_rpc
12 from .google_imports
import entity_pb
14 from .google_imports
import ProtocolBuffer
16 from . import key
as key_module
18 from . import tasklets
19 from . import eventloop
22 __all__
= ['Context', 'ContextOptions', 'TransactionOptions', 'AutoBatcher',
23 'EVENTUAL_CONSISTENCY',
26 _LOCK_TIME
= 32 # Time to lock out memcache.add() after datastore updates.
27 _LOCKED
= 0 # Special value to store in memcache indicating locked value.
30 # Constant for read_policy.
31 EVENTUAL_CONSISTENCY
= datastore_rpc
.Configuration
.EVENTUAL_CONSISTENCY
34 class ContextOptions(datastore_rpc
.Configuration
):
35 """Configuration options that may be passed along with get/put/delete."""
37 @datastore_rpc.ConfigOption
39 if not isinstance(value
, bool):
40 raise datastore_errors
.BadArgumentError(
41 'use_cache should be a bool (%r)' % (value
,))
44 @datastore_rpc.ConfigOption
45 def use_memcache(value
):
46 if not isinstance(value
, bool):
47 raise datastore_errors
.BadArgumentError(
48 'use_memcache should be a bool (%r)' % (value
,))
51 @datastore_rpc.ConfigOption
52 def use_datastore(value
):
53 if not isinstance(value
, bool):
54 raise datastore_errors
.BadArgumentError(
55 'use_datastore should be a bool (%r)' % (value
,))
58 @datastore_rpc.ConfigOption
59 def memcache_timeout(value
):
60 if not isinstance(value
, (int, long)):
61 raise datastore_errors
.BadArgumentError(
62 'memcache_timeout should be an integer (%r)' % (value
,))
65 @datastore_rpc.ConfigOption
66 def max_memcache_items(value
):
67 if not isinstance(value
, (int, long)):
68 raise datastore_errors
.BadArgumentError(
69 'max_memcache_items should be an integer (%r)' % (value
,))
72 @datastore_rpc.ConfigOption
73 def memcache_deadline(value
):
74 if not isinstance(value
, (int, long)):
75 raise datastore_errors
.BadArgumentError(
76 'memcache_deadline should be an integer (%r)' % (value
,))
79 class TransactionOptions(ContextOptions
, datastore_rpc
.TransactionOptions
):
80 """Support both context options and transaction options."""
84 # options and config can be used interchangeably.
85 _OPTION_TRANSLATIONS
= {
90 def _make_ctx_options(ctx_options
, config_cls
=ContextOptions
):
91 """Helper to construct a ContextOptions object from keyword arguments.
94 ctx_options: A dict of keyword arguments.
95 config_cls: Optional Configuration class to use, default ContextOptions.
97 Note that either 'options' or 'config' can be used to pass another
98 Configuration object, but not both. If another Configuration
99 object is given it provides default values.
102 A Configuration object, or None if ctx_options is empty.
106 for key
in list(ctx_options
):
107 translation
= _OPTION_TRANSLATIONS
.get(key
)
109 if translation
in ctx_options
:
110 raise ValueError('Cannot specify %s and %s at the same time' %
112 ctx_options
[translation
] = ctx_options
.pop(key
)
113 return config_cls(**ctx_options
)
116 class AutoBatcher(object):
117 """Batches multiple async calls if they share the same rpc options.
119 Here is an example to explain what this class does.
121 Life of a key.get_async(options) API call:
122 *) Key gets the singleton Context instance and invokes Context.get.
123 *) Context.get calls Context._get_batcher.add(key, options). This
124 returns a future "fut" as the return value of key.get_async.
125 At this moment, key.get_async returns.
127 *) When more than "limit" number of _get_batcher.add() was called,
128 _get_batcher invokes its self._todo_tasklet, Context._get_tasklet,
129 with the list of keys seen so far.
130 *) Context._get_tasklet fires a MultiRPC and waits on it.
131 *) Upon MultiRPC completion, Context._get_tasklet passes on the results
132 to the respective "fut" from key.get_async.
134 *) If user calls "fut".get_result() before "limit" number of add() was called,
135 "fut".get_result() will repeatedly call eventloop.run1().
136 *) After processing immediate callbacks, eventloop will run idlers.
137 AutoBatcher._on_idle is an idler.
138 *) _on_idle will run the "todo_tasklet" before the batch is full.
140 So the engine is todo_tasklet, which is a proxy tasklet that can combine
141 arguments into batches and passes along results back to respective futures.
142 This class is mainly a helper that invokes todo_tasklet with the right
143 arguments at the right time.
145 So simple and obvious.
148 def __init__(self
, todo_tasklet
, limit
):
152 todo_tasklet: the tasklet that actually fires RPC and waits on a MultiRPC.
153 It should take a list of (future, arg) pairs and an "options" as
154 arguments. "options" are rpc options.
155 limit: max number of items to batch for each distinct value of "options".
157 self
._todo
_tasklet
= todo_tasklet
159 # A map from "options" to a list of (future, arg) tuple.
160 # future is the future return from a single async operations.
162 self
._running
= [] # A list of in-flight todo_tasklet futures.
163 self
._cache
= {} # Cache of in-flight todo_tasklet futures.
166 return '%s(%s)' % (self
.__class
__.__name
__, self
._todo
_tasklet
.__name
__)
168 def run_queue(self
, options
, todo
):
169 """Actually run the _todo_tasklet."""
170 utils
.logging_debug('AutoBatcher(%s): %d items',
171 self
._todo
_tasklet
.__name
__, len(todo
))
172 batch_fut
= self
._todo
_tasklet
(todo
, options
)
173 self
._running
.append(batch_fut
)
174 # Add a callback when we're done.
175 batch_fut
.add_callback(self
._finished
_callback
, batch_fut
, todo
)
178 """An idler eventloop can run.
180 Eventloop calls this when it has finished processing all immediate
181 callbacks. This method runs _todo_tasklet even before the batch is full.
183 if not self
.action():
187 def add(self
, arg
, options
=None):
188 """Adds an arg and gets back a future.
191 arg: one argument for _todo_tasklet.
192 options: rpc options.
195 An instance of future, representing the result of running
196 _todo_tasklet without batching.
198 fut
= tasklets
.Future('%s.add(%s, %s)' % (self
, arg
, options
))
199 todo
= self
._queues
.get(options
)
201 utils
.logging_debug('AutoBatcher(%s): creating new queue for %r',
202 self
._todo
_tasklet
.__name
__, options
)
204 eventloop
.add_idle(self
._on
_idle
)
205 todo
= self
._queues
[options
] = []
206 todo
.append((fut
, arg
))
207 if len(todo
) >= self
._limit
:
208 del self
._queues
[options
]
209 self
.run_queue(options
, todo
)
212 def add_once(self
, arg
, options
=None):
213 cache_key
= (arg
, options
)
214 fut
= self
._cache
.get(cache_key
)
216 fut
= self
.add(arg
, options
)
217 self
._cache
[cache_key
] = fut
218 fut
.add_immediate_callback(self
._cache
.__delitem
__, cache_key
)
222 queues
= self
._queues
225 options
, todo
= queues
.popitem() # TODO: Should this use FIFO ordering?
226 self
.run_queue(options
, todo
)
229 def _finished_callback(self
, batch_fut
, todo
):
230 """Passes exception along.
233 batch_fut: the batch future returned by running todo_tasklet.
234 todo: (fut, option) pair. fut is the future return by each add() call.
236 If the batch fut was successful, it has already called fut.set_result()
237 on other individual futs. This method only handles when the batch fut
238 encountered an exception.
240 self
._running
.remove(batch_fut
)
241 err
= batch_fut
.get_exception()
243 tb
= batch_fut
.get_traceback()
244 for (fut
, _
) in todo
:
246 fut
.set_exception(err
, tb
)
250 while self
._running
or self
.action():
252 yield self
._running
# A list of Futures
255 class Context(object):
257 def __init__(self
, conn
=None, auto_batcher_class
=AutoBatcher
, config
=None,
258 parent_context
=None):
259 # NOTE: If conn is not None, config is only used to get the
260 # auto-batcher limits.
262 conn
= model
.make_connection(config
)
264 self
._auto
_batcher
_class
= auto_batcher_class
265 self
._parent
_context
= parent_context
# For transaction nesting.
266 # Get the get/put/delete limits (defaults 1000, 500, 500).
267 # Note that the explicit config passed in overrides the config
268 # attached to the connection, if it was passed in.
269 max_get
= (datastore_rpc
.Configuration
.max_get_keys(config
, conn
.config
) or
270 datastore_rpc
.Connection
.MAX_GET_KEYS
)
271 max_put
= (datastore_rpc
.Configuration
.max_put_entities(config
,
273 datastore_rpc
.Connection
.MAX_PUT_ENTITIES
)
274 max_delete
= (datastore_rpc
.Configuration
.max_delete_keys(config
,
276 datastore_rpc
.Connection
.MAX_DELETE_KEYS
)
277 # Create the get/put/delete auto-batchers.
278 self
._get
_batcher
= auto_batcher_class(self
._get
_tasklet
, max_get
)
279 self
._put
_batcher
= auto_batcher_class(self
._put
_tasklet
, max_put
)
280 self
._delete
_batcher
= auto_batcher_class(self
._delete
_tasklet
, max_delete
)
281 # We only have a single limit for memcache (default 1000).
282 max_memcache
= (ContextOptions
.max_memcache_items(config
, conn
.config
) or
283 datastore_rpc
.Connection
.MAX_GET_KEYS
)
284 # Create the memcache auto-batchers.
285 self
._memcache
_get
_batcher
= auto_batcher_class(self
._memcache
_get
_tasklet
,
287 self
._memcache
_set
_batcher
= auto_batcher_class(self
._memcache
_set
_tasklet
,
289 self
._memcache
_del
_batcher
= auto_batcher_class(self
._memcache
_del
_tasklet
,
291 self
._memcache
_off
_batcher
= auto_batcher_class(self
._memcache
_off
_tasklet
,
293 # Create a list of batchers for flush().
294 self
._batchers
= [self
._get
_batcher
,
296 self
._delete
_batcher
,
297 self
._memcache
_get
_batcher
,
298 self
._memcache
_set
_batcher
,
299 self
._memcache
_del
_batcher
,
300 self
._memcache
_off
_batcher
,
303 self
._memcache
= memcache
.Client()
304 self
._on
_commit
_queue
= []
306 # NOTE: The default memcache prefix is altered if an incompatible change is
307 # required. Remember to check release notes when using a custom prefix.
308 _memcache_prefix
= 'NDB9:' # TODO: Might make this configurable.
312 # Rinse and repeat until all batchers are completely out of work.
315 yield [batcher
.flush() for batcher
in self
._batchers
]
317 for batcher
in self
._batchers
:
318 if batcher
._running
or batcher
._queues
:
323 def _get_tasklet(self
, todo
, options
):
325 raise RuntimeError('Nothing to do.')
326 # Make the datastore RPC call.
328 for unused_fut
, key
in todo
:
329 datastore_keys
.append(key
)
330 # Now wait for the datastore RPC(s) and pass the results to the futures.
331 entities
= yield self
._conn
.async_get(options
, datastore_keys
)
332 for ent
, (fut
, unused_key
) in zip(entities
, todo
):
336 def _put_tasklet(self
, todo
, options
):
338 raise RuntimeError('Nothing to do.')
339 # TODO: What if the same entity is being put twice?
340 # TODO: What if two entities with the same key are being put?
341 datastore_entities
= []
342 for unused_fut
, ent
in todo
:
343 datastore_entities
.append(ent
)
344 # Wait for datastore RPC(s).
345 keys
= yield self
._conn
.async_put(options
, datastore_entities
)
346 for key
, (fut
, ent
) in zip(keys
, todo
):
348 if ent
._has
_complete
_key
():
349 raise datastore_errors
.BadKeyError(
350 'Entity key differs from the one returned by the datastore. '
351 'Expected %r, got %r' % (key
, ent
._key
))
356 def _delete_tasklet(self
, todo
, options
):
358 raise RuntimeError('Nothing to do.')
361 for fut
, key
in todo
:
363 datastore_keys
.append(key
)
364 # Wait for datastore RPC(s).
365 yield self
._conn
.async_delete(options
, datastore_keys
)
366 # Send a dummy result to all original Futures.
370 # TODO: Unify the policy docstrings (they're getting too verbose).
372 # All the policy functions may also:
373 # - be a constant of the right type (instead of a function);
374 # - return None (instead of a value of the right type);
375 # - be None (instead of a function or constant).
377 # Model classes may define class variables or class methods
378 # _use_{cache,memcache,datastore} or _memcache_timeout to set the
379 # default policy of that type for that class.
382 def default_cache_policy(key
):
383 """Default cache policy.
385 This defers to _use_cache on the Model class.
395 modelclass
= model
.Model
._kind
_map
.get(key
.kind())
396 if modelclass
is not None:
397 policy
= getattr(modelclass
, '_use_cache', None)
398 if policy
is not None:
399 if isinstance(policy
, bool):
405 _cache_policy
= default_cache_policy
407 def get_cache_policy(self
):
408 """Return the current context cache policy function.
411 A function that accepts a Key instance as argument and returns
412 a bool indicating if it should be cached. May be None.
414 return self
._cache
_policy
416 def set_cache_policy(self
, func
):
417 """Set the context cache policy function.
420 func: A function that accepts a Key instance as argument and returns
421 a bool indicating if it should be cached. May be None.
424 func
= self
.default_cache_policy
425 elif isinstance(func
, bool):
426 func
= lambda unused_key
, flag
=func
: flag
427 self
._cache
_policy
= func
429 def _use_cache(self
, key
, options
=None):
430 """Return whether to use the context cache for this key.
434 options: ContextOptions instance, or None.
437 True if the key should be cached, False otherwise.
439 flag
= ContextOptions
.use_cache(options
)
441 flag
= self
._cache
_policy
(key
)
443 flag
= ContextOptions
.use_cache(self
._conn
.config
)
449 def default_memcache_policy(key
):
450 """Default memcache policy.
452 This defers to _use_memcache on the Model class.
462 modelclass
= model
.Model
._kind
_map
.get(key
.kind())
463 if modelclass
is not None:
464 policy
= getattr(modelclass
, '_use_memcache', None)
465 if policy
is not None:
466 if isinstance(policy
, bool):
472 _memcache_policy
= default_memcache_policy
474 def get_memcache_policy(self
):
475 """Return the current memcache policy function.
478 A function that accepts a Key instance as argument and returns
479 a bool indicating if it should be cached. May be None.
481 return self
._memcache
_policy
483 def set_memcache_policy(self
, func
):
484 """Set the memcache policy function.
487 func: A function that accepts a Key instance as argument and returns
488 a bool indicating if it should be cached. May be None.
491 func
= self
.default_memcache_policy
492 elif isinstance(func
, bool):
493 func
= lambda unused_key
, flag
=func
: flag
494 self
._memcache
_policy
= func
496 def _use_memcache(self
, key
, options
=None):
497 """Return whether to use memcache for this key.
501 options: ContextOptions instance, or None.
504 True if the key should be cached in memcache, False otherwise.
506 flag
= ContextOptions
.use_memcache(options
)
508 flag
= self
._memcache
_policy
(key
)
510 flag
= ContextOptions
.use_memcache(self
._conn
.config
)
516 def default_datastore_policy(key
):
517 """Default datastore policy.
519 This defers to _use_datastore on the Model class.
529 modelclass
= model
.Model
._kind
_map
.get(key
.kind())
530 if modelclass
is not None:
531 policy
= getattr(modelclass
, '_use_datastore', None)
532 if policy
is not None:
533 if isinstance(policy
, bool):
539 _datastore_policy
= default_datastore_policy
541 def get_datastore_policy(self
):
542 """Return the current context datastore policy function.
545 A function that accepts a Key instance as argument and returns
546 a bool indicating if it should use the datastore. May be None.
548 return self
._datastore
_policy
550 def set_datastore_policy(self
, func
):
551 """Set the context datastore policy function.
554 func: A function that accepts a Key instance as argument and returns
555 a bool indicating if it should use the datastore. May be None.
558 func
= self
.default_datastore_policy
559 elif isinstance(func
, bool):
560 func
= lambda unused_key
, flag
=func
: flag
561 self
._datastore
_policy
= func
563 def _use_datastore(self
, key
, options
=None):
564 """Return whether to use the datastore for this key.
568 options: ContextOptions instance, or None.
571 True if the datastore should be used, False otherwise.
573 flag
= ContextOptions
.use_datastore(options
)
575 flag
= self
._datastore
_policy
(key
)
577 flag
= ContextOptions
.use_datastore(self
._conn
.config
)
583 def default_memcache_timeout_policy(key
):
584 """Default memcache timeout policy.
586 This defers to _memcache_timeout on the Model class.
592 Memcache timeout to use (integer), or None.
595 if key
is not None and isinstance(key
, model
.Key
):
596 modelclass
= model
.Model
._kind
_map
.get(key
.kind())
597 if modelclass
is not None:
598 policy
= getattr(modelclass
, '_memcache_timeout', None)
599 if policy
is not None:
600 if isinstance(policy
, (int, long)):
603 timeout
= policy(key
)
606 _memcache_timeout_policy
= default_memcache_timeout_policy
608 def set_memcache_timeout_policy(self
, func
):
609 """Set the policy function for memcache timeout (expiration).
612 func: A function that accepts a key instance as argument and returns
613 an integer indicating the desired memcache timeout. May be None.
615 If the function returns 0 it implies the default timeout.
618 func
= self
.default_memcache_timeout_policy
619 elif isinstance(func
, (int, long)):
620 func
= lambda unused_key
, flag
=func
: flag
621 self
._memcache
_timeout
_policy
= func
623 def get_memcache_timeout_policy(self
):
624 """Return the current policy function for memcache timeout (expiration)."""
625 return self
._memcache
_timeout
_policy
627 def _get_memcache_timeout(self
, key
, options
=None):
628 """Return the memcache timeout (expiration) for this key."""
629 timeout
= ContextOptions
.memcache_timeout(options
)
631 timeout
= self
._memcache
_timeout
_policy
(key
)
633 timeout
= ContextOptions
.memcache_timeout(self
._conn
.config
)
638 def _get_memcache_deadline(self
, options
=None):
639 """Return the memcache RPC deadline.
641 Not to be confused with the memcache timeout, or expiration.
643 This is only used by datastore operations when using memcache
644 as a cache; it is ignored by the direct memcache calls.
646 There is no way to vary this per key or per entity; you must either
647 set it on a specific call (e.g. key.get(memcache_deadline=1) or
648 in the configuration options of the context's connection.
650 # If this returns None, the system default (typically, 5) will apply.
651 return ContextOptions
.memcache_deadline(options
, self
._conn
.config
)
654 def _load_from_cache_if_available(self
, key
):
655 """Returns a cached Model instance given the entity key if available.
661 A Model instance if the key exists in the cache.
663 if key
in self
._cache
:
664 entity
= self
._cache
[key
] # May be None, meaning "doesn't exist".
665 if entity
is None or entity
._key
== key
:
666 # If entity's key didn't change later, it is ok.
667 # See issue 13. http://goo.gl/jxjOP
668 raise tasklets
.Return(entity
)
670 # TODO: What about conflicting requests to different autobatchers,
671 # e.g. tasklet A calls get() on a given key while tasklet B calls
672 # delete()? The outcome is nondeterministic, depending on which
673 # autobatcher gets run first. Maybe we should just flag such
674 # conflicts as errors, with an overridable policy to resolve them
678 def get(self
, key
, **ctx_options
):
679 """Return a Model instance given the entity key.
681 It will use the context cache if the cache policy for the given
686 **ctx_options: Context options.
689 A Model instance if the key exists in the datastore; None otherwise.
691 options
= _make_ctx_options(ctx_options
)
692 use_cache
= self
._use
_cache
(key
, options
)
694 self
._load
_from
_cache
_if
_available
(key
)
696 use_datastore
= self
._use
_datastore
(key
, options
)
697 if (use_datastore
and
698 isinstance(self
._conn
, datastore_rpc
.TransactionalConnection
)):
701 use_memcache
= self
._use
_memcache
(key
, options
)
703 memcache_deadline
= None # Avoid worries about uninitialized variable.
706 mkey
= self
._memcache
_prefix
+ key
.urlsafe()
707 memcache_deadline
= self
._get
_memcache
_deadline
(options
)
708 mvalue
= yield self
.memcache_get(mkey
, for_cas
=use_datastore
,
709 namespace
=ns
, use_cache
=True,
710 deadline
=memcache_deadline
)
711 # A value may have appeared while yielding.
713 self
._load
_from
_cache
_if
_available
(key
)
714 if mvalue
not in (_LOCKED
, None):
715 cls
= model
.Model
._lookup
_model
(key
.kind(),
716 self
._conn
.adapter
.default_model
)
717 pb
= entity_pb
.EntityProto()
720 pb
.MergePartialFromString(mvalue
)
721 except ProtocolBuffer
.ProtocolBufferDecodeError
:
722 logging
.warning('Corrupt memcache entry found '
723 'with key %s and namespace %s' % (mkey
, ns
))
726 entity
= cls
._from
_pb
(pb
)
727 # Store the key on the entity since it wasn't written to memcache.
730 # Update in-memory cache.
731 self
._cache
[key
] = entity
732 raise tasklets
.Return(entity
)
734 if mvalue
is None and use_datastore
:
735 yield self
.memcache_set(mkey
, _LOCKED
, time
=_LOCK_TIME
, namespace
=ns
,
736 use_cache
=True, deadline
=memcache_deadline
)
737 yield self
.memcache_gets(mkey
, namespace
=ns
, use_cache
=True,
738 deadline
=memcache_deadline
)
740 if not use_datastore
:
741 # NOTE: Do not cache this miss. In some scenarios this would
742 # prevent an app from working properly.
743 raise tasklets
.Return(None)
746 entity
= yield self
._get
_batcher
.add_once(key
, options
)
748 entity
= yield self
._get
_batcher
.add(key
, options
)
750 if entity
is not None:
751 if use_memcache
and mvalue
!= _LOCKED
:
752 # Don't serialize the key since it's already the memcache key.
753 pbs
= entity
._to
_pb
(set_key
=False).SerializePartialToString()
754 # Don't attempt to write to memcache if too big. Note that we
755 # use LBYL ("look before you leap") because a multi-value
756 # memcache operation would fail for all entities rather than
757 # for just the one that's too big. (Also, the AutoBatcher
758 # class doesn't pass back exceptions very well.)
759 if len(pbs
) <= memcache
.MAX_VALUE_SIZE
:
760 timeout
= self
._get
_memcache
_timeout
(key
, options
)
761 # Don't use fire-and-forget -- for users who forget
762 # @ndb.toplevel, it's too painful to diagnose why their simple
763 # code using a single synchronous call doesn't seem to use
764 # memcache. See issue 105. http://goo.gl/JQZxp
765 yield self
.memcache_cas(mkey
, pbs
, time
=timeout
, namespace
=ns
,
766 deadline
=memcache_deadline
)
769 # Cache hit or miss. NOTE: In this case it is okay to cache a
770 # miss; the datastore is the ultimate authority.
771 self
._cache
[key
] = entity
773 raise tasklets
.Return(entity
)
776 def put(self
, entity
, **ctx_options
):
777 options
= _make_ctx_options(ctx_options
)
778 # TODO: What if the same entity is being put twice?
779 # TODO: What if two entities with the same key are being put?
782 # Pass a dummy Key to _use_datastore().
783 key
= model
.Key(entity
.__class
__, None)
784 use_datastore
= self
._use
_datastore
(key
, options
)
786 memcache_deadline
= None # Avoid worries about uninitialized variable.
788 if entity
._has
_complete
_key
():
789 use_memcache
= self
._use
_memcache
(key
, options
)
791 # Wait for memcache operations before starting datastore RPCs.
792 memcache_deadline
= self
._get
_memcache
_deadline
(options
)
793 mkey
= self
._memcache
_prefix
+ key
.urlsafe()
796 yield self
.memcache_set(mkey
, _LOCKED
, time
=_LOCK_TIME
,
797 namespace
=ns
, use_cache
=True,
798 deadline
=memcache_deadline
)
800 pbs
= entity
._to
_pb
(set_key
=False).SerializePartialToString()
801 # If the byte string to be written is too long for memcache,
802 # raise ValueError. (See LBYL explanation in get().)
803 if len(pbs
) > memcache
.MAX_VALUE_SIZE
:
804 raise ValueError('Values may not be more than %d bytes in length; '
805 'received %d bytes' % (memcache
.MAX_VALUE_SIZE
,
807 timeout
= self
._get
_memcache
_timeout
(key
, options
)
808 yield self
.memcache_set(mkey
, pbs
, time
=timeout
, namespace
=ns
,
809 deadline
=memcache_deadline
)
812 key
= yield self
._put
_batcher
.add(entity
, options
)
813 if not isinstance(self
._conn
, datastore_rpc
.TransactionalConnection
):
814 if use_memcache
is None:
815 use_memcache
= self
._use
_memcache
(key
, options
)
817 mkey
= self
._memcache
_prefix
+ key
.urlsafe()
819 # Don't use fire-and-forget -- see memcache_cas() in get().
820 yield self
.memcache_delete(mkey
, namespace
=ns
,
821 deadline
=memcache_deadline
)
824 if entity
._key
!= key
:
825 logging
.info('replacing key %s with %s', entity
._key
, key
)
827 # TODO: For updated entities, could we update the cache first?
828 if self
._use
_cache
(key
, options
):
829 # TODO: What if by now the entity is already in the cache?
830 self
._cache
[key
] = entity
832 raise tasklets
.Return(key
)
835 def delete(self
, key
, **ctx_options
):
836 options
= _make_ctx_options(ctx_options
)
837 if self
._use
_memcache
(key
, options
):
838 memcache_deadline
= self
._get
_memcache
_deadline
(options
)
839 mkey
= self
._memcache
_prefix
+ key
.urlsafe()
841 # TODO: If not use_datastore, delete instead of lock?
842 yield self
.memcache_set(mkey
, _LOCKED
, time
=_LOCK_TIME
, namespace
=ns
,
843 use_cache
=True, deadline
=memcache_deadline
)
845 if self
._use
_datastore
(key
, options
):
846 yield self
._delete
_batcher
.add(key
, options
)
847 # TODO: Delete from memcache here?
849 if self
._use
_cache
(key
, options
):
850 self
._cache
[key
] = None
853 def allocate_ids(self
, key
, size
=None, max=None, **ctx_options
):
854 options
= _make_ctx_options(ctx_options
)
855 lo_hi
= yield self
._conn
.async_allocate_ids(options
, key
, size
, max)
856 raise tasklets
.Return(lo_hi
)
859 def get_indexes(self
, **ctx_options
):
860 options
= _make_ctx_options(ctx_options
)
861 index_list
= yield self
._conn
.async_get_indexes(options
)
862 raise tasklets
.Return(index_list
)
865 def map_query(self
, query
, callback
, pass_batch_into_callback
=None,
866 options
=None, merge_future
=None):
869 mfut
= tasklets
.MultiFuture('map_query')
874 inq
= tasklets
.SerialQueueFuture()
875 query
.run_to_queue(inq
, self
._conn
, options
)
878 batch
, i
, ent
= yield inq
.getq()
881 ent
= self
._update
_cache
_from
_query
_result
(ent
, options
)
887 # TODO: If the callback raises, log and ignore.
888 if pass_batch_into_callback
:
889 val
= callback(batch
, i
, ent
)
893 except GeneratorExit
:
895 except Exception, err
:
896 _
, _
, tb
= sys
.exc_info()
897 mfut
.set_exception(err
, tb
)
905 def _update_cache_from_query_result(self
, ent
, options
):
906 if isinstance(ent
, model
.Key
):
907 return ent
# It was a keys-only query and ent is really a Key.
909 return ent
# Never cache partial entities (projection query results).
911 if not self
._use
_cache
(key
, options
):
912 return ent
# This key should not be cached.
914 # Check the cache. If there is a valid cached entry, substitute
915 # that for the result, even if the cache has an explicit None.
916 if key
in self
._cache
:
917 cached_ent
= self
._cache
[key
]
918 if (cached_ent
is None or
919 cached_ent
.key
== key
and cached_ent
.__class
__ is ent
.__class
__):
923 self
._cache
[key
] = ent
927 def iter_query(self
, query
, callback
=None, pass_batch_into_callback
=None,
929 return self
.map_query(query
, callback
=callback
, options
=options
,
930 pass_batch_into_callback
=pass_batch_into_callback
,
931 merge_future
=tasklets
.SerialQueueFuture())
934 def transaction(self
, callback
, **ctx_options
):
935 # Will invoke callback() one or more times with the default
936 # context set to a new, transactional Context. Returns a Future.
937 # Callback may be a tasklet; in that case it will be waited on.
938 options
= _make_ctx_options(ctx_options
, TransactionOptions
)
939 propagation
= TransactionOptions
.propagation(options
)
940 if propagation
is None:
941 propagation
= TransactionOptions
.NESTED
944 if propagation
== TransactionOptions
.NESTED
:
945 if self
.in_transaction():
946 raise datastore_errors
.BadRequestError(
947 'Nested transactions are not supported.')
948 elif propagation
== TransactionOptions
.MANDATORY
:
949 if not self
.in_transaction():
950 raise datastore_errors
.BadRequestError(
951 'Requires an existing transaction.')
953 if isinstance(result
, tasklets
.Future
):
954 result
= yield result
955 raise tasklets
.Return(result
)
956 elif propagation
== TransactionOptions
.ALLOWED
:
957 if self
.in_transaction():
959 if isinstance(result
, tasklets
.Future
):
960 result
= yield result
961 raise tasklets
.Return(result
)
962 elif propagation
== TransactionOptions
.INDEPENDENT
:
963 while parent
.in_transaction():
964 parent
= parent
._parent
_context
966 raise datastore_errors
.BadRequestError(
967 'Context without non-transactional ancestor')
969 raise datastore_errors
.BadArgumentError(
970 'Invalid propagation value (%s).' % (propagation
,))
972 app
= TransactionOptions
.app(options
) or key_module
._DefaultAppId
()
973 # Note: zero retries means try it once.
974 retries
= TransactionOptions
.retries(options
)
978 for _
in xrange(1 + max(0, retries
)):
979 transaction
= yield parent
._conn
.async_begin_transaction(options
, app
)
980 tconn
= datastore_rpc
.TransactionalConnection(
981 adapter
=parent
._conn
.adapter
,
982 config
=parent
._conn
.config
,
983 transaction
=transaction
)
984 tctx
= parent
.__class
__(conn
=tconn
,
985 auto_batcher_class
=parent
._auto
_batcher
_class
,
986 parent_context
=parent
)
987 tctx
._old
_ds
_conn
= datastore
._GetConnection
()
990 # Copy memcache policies. Note that get() will never use
991 # memcache in a transaction, but put and delete should do their
992 # memcache thing (which is to mark the key as deleted for
993 # _LOCK_TIME seconds). Also note that the in-process cache and
994 # datastore policies keep their default (on) state.
995 tctx
.set_memcache_policy(parent
.get_memcache_policy())
996 tctx
.set_memcache_timeout_policy(parent
.get_memcache_timeout_policy())
997 tasklets
.set_context(tctx
)
998 datastore
._SetConnection
(tconn
) # For taskqueue coordination
1002 if isinstance(result
, tasklets
.Future
):
1003 result
= yield result
1006 except GeneratorExit
:
1009 t
, e
, tb
= sys
.exc_info()
1010 tconn
.async_rollback(options
) # Fire and forget.
1011 if issubclass(t
, datastore_errors
.Rollback
):
1012 # TODO: Raise value using tasklets.get_return_value(t)?
1017 ok
= yield tconn
.async_commit(options
)
1019 parent
._cache
.update(tctx
._cache
)
1020 yield parent
._clear
_memcache
(tctx
._cache
)
1021 raise tasklets
.Return(result
)
1022 # The finally clause will run the on-commit queue.
1024 datastore
._SetConnection
(tctx
._old
_ds
_conn
)
1025 del tctx
._old
_ds
_conn
1027 # Call the callbacks collected in the transaction context's
1028 # on-commit queue. If the transaction failed the queue is
1029 # abandoned. We must do this after the connection has been
1030 # restored, but we can't do it after the for-loop because we
1031 # leave it by raising tasklets.Return().
1032 for on_commit_callback
in tctx
._on
_commit
_queue
:
1033 on_commit_callback() # This better not raise.
1036 raise datastore_errors
.TransactionFailedError(
1037 'The transaction could not be committed. Please try again.')
1039 def in_transaction(self
):
1040 """Return whether a transaction is currently active."""
1041 return isinstance(self
._conn
, datastore_rpc
.TransactionalConnection
)
1043 def call_on_commit(self
, callback
):
1044 """Call a callback upon successful commit of a transaction.
1046 If not in a transaction, the callback is called immediately.
1048 In a transaction, multiple callbacks may be registered and will be
1049 called once the transaction commits, in the order in which they
1050 were registered. If the transaction fails, the callbacks will not
1053 If the callback raises an exception, it bubbles up normally. This
1054 means: If the callback is called immediately, any exception it
1055 raises will bubble up immediately. If the call is postponed until
1056 commit, remaining callbacks will be skipped and the exception will
1057 bubble up through the transaction() call. (However, the
1058 transaction is already committed at that point.)
1060 if not self
.in_transaction():
1063 self
._on
_commit
_queue
.append(callback
)
1065 def clear_cache(self
):
1066 """Clears the in-memory cache.
1068 NOTE: This does not affect memcache.
1073 def _clear_memcache(self
, keys
):
1074 keys
= set(key
for key
in keys
if self
._use
_memcache
(key
))
1077 mkey
= self
._memcache
_prefix
+ key
.urlsafe()
1078 ns
= key
.namespace()
1079 fut
= self
.memcache_delete(mkey
, namespace
=ns
)
1084 def _memcache_get_tasklet(self
, todo
, options
):
1086 raise RuntimeError('Nothing to do.')
1087 for_cas
, namespace
, deadline
= options
1089 for unused_fut
, key
in todo
:
1091 rpc
= memcache
.create_rpc(deadline
=deadline
)
1092 results
= yield self
._memcache
.get_multi_async(keys
, for_cas
=for_cas
,
1093 namespace
=namespace
,
1095 for fut
, key
in todo
:
1096 fut
.set_result(results
.get(key
))
1099 def _memcache_set_tasklet(self
, todo
, options
):
1101 raise RuntimeError('Nothing to do.')
1102 opname
, time
, namespace
, deadline
= options
1103 methodname
= opname
+ '_multi_async'
1104 method
= getattr(self
._memcache
, methodname
)
1106 for unused_fut
, (key
, value
) in todo
:
1107 mapping
[key
] = value
1108 rpc
= memcache
.create_rpc(deadline
=deadline
)
1109 results
= yield method(mapping
, time
=time
, namespace
=namespace
, rpc
=rpc
)
1110 for fut
, (key
, unused_value
) in todo
:
1112 status
= memcache
.MemcacheSetResponse
.ERROR
1114 status
= results
.get(key
)
1115 fut
.set_result(status
== memcache
.MemcacheSetResponse
.STORED
)
1118 def _memcache_del_tasklet(self
, todo
, options
):
1120 raise RuntimeError('Nothing to do.')
1121 seconds
, namespace
, deadline
= options
1123 for unused_fut
, key
in todo
:
1125 rpc
= memcache
.create_rpc(deadline
=deadline
)
1126 statuses
= yield self
._memcache
.delete_multi_async(keys
, seconds
=seconds
,
1127 namespace
=namespace
,
1129 status_key_mapping
= {}
1130 if statuses
: # On network error, statuses is None.
1131 for key
, status
in zip(keys
, statuses
):
1132 status_key_mapping
[key
] = status
1133 for fut
, key
in todo
:
1134 status
= status_key_mapping
.get(key
, memcache
.DELETE_NETWORK_FAILURE
)
1135 fut
.set_result(status
)
1138 def _memcache_off_tasklet(self
, todo
, options
):
1140 raise RuntimeError('Nothing to do.')
1141 initial_value
, namespace
, deadline
= options
1142 mapping
= {} # {key: delta}
1143 for unused_fut
, (key
, delta
) in todo
:
1144 mapping
[key
] = delta
1145 rpc
= memcache
.create_rpc(deadline
=deadline
)
1146 results
= yield self
._memcache
.offset_multi_async(
1147 mapping
, initial_value
=initial_value
, namespace
=namespace
, rpc
=rpc
)
1148 for fut
, (key
, unused_delta
) in todo
:
1149 fut
.set_result(results
.get(key
))
1151 def memcache_get(self
, key
, for_cas
=False, namespace
=None, use_cache
=False,
1153 """An auto-batching wrapper for memcache.get() or .get_multi().
1156 key: Key to set. This must be a string; no prefix is applied.
1157 for_cas: If True, request and store CAS ids on the Context.
1158 namespace: Optional namespace.
1159 deadline: Optional deadline for the RPC.
1162 A Future (!) whose return value is the value retrieved from
1165 if not isinstance(key
, basestring
):
1166 raise TypeError('key must be a string; received %r' % key
)
1167 if not isinstance(for_cas
, bool):
1168 raise TypeError('for_cas must be a bool; received %r' % for_cas
)
1169 if namespace
is None:
1170 namespace
= namespace_manager
.get_namespace()
1171 options
= (for_cas
, namespace
, deadline
)
1172 batcher
= self
._memcache
_get
_batcher
1174 return batcher
.add_once(key
, options
)
1176 return batcher
.add(key
, options
)
1178 # XXX: Docstrings below.
1180 def memcache_gets(self
, key
, namespace
=None, use_cache
=False, deadline
=None):
1181 return self
.memcache_get(key
, for_cas
=True, namespace
=namespace
,
1182 use_cache
=use_cache
, deadline
=deadline
)
1184 def memcache_set(self
, key
, value
, time
=0, namespace
=None, use_cache
=False,
1186 if not isinstance(key
, basestring
):
1187 raise TypeError('key must be a string; received %r' % key
)
1188 if not isinstance(time
, (int, long)):
1189 raise TypeError('time must be a number; received %r' % time
)
1190 if namespace
is None:
1191 namespace
= namespace_manager
.get_namespace()
1192 options
= ('set', time
, namespace
, deadline
)
1193 batcher
= self
._memcache
_set
_batcher
1195 return batcher
.add_once((key
, value
), options
)
1197 return batcher
.add((key
, value
), options
)
1199 def memcache_add(self
, key
, value
, time
=0, namespace
=None, deadline
=None):
1200 if not isinstance(key
, basestring
):
1201 raise TypeError('key must be a string; received %r' % key
)
1202 if not isinstance(time
, (int, long)):
1203 raise TypeError('time must be a number; received %r' % time
)
1204 if namespace
is None:
1205 namespace
= namespace_manager
.get_namespace()
1206 return self
._memcache
_set
_batcher
.add((key
, value
),
1207 ('add', time
, namespace
, deadline
))
1209 def memcache_replace(self
, key
, value
, time
=0, namespace
=None, deadline
=None):
1210 if not isinstance(key
, basestring
):
1211 raise TypeError('key must be a string; received %r' % key
)
1212 if not isinstance(time
, (int, long)):
1213 raise TypeError('time must be a number; received %r' % time
)
1214 if namespace
is None:
1215 namespace
= namespace_manager
.get_namespace()
1216 options
= ('replace', time
, namespace
, deadline
)
1217 return self
._memcache
_set
_batcher
.add((key
, value
), options
)
1219 def memcache_cas(self
, key
, value
, time
=0, namespace
=None, deadline
=None):
1220 if not isinstance(key
, basestring
):
1221 raise TypeError('key must be a string; received %r' % key
)
1222 if not isinstance(time
, (int, long)):
1223 raise TypeError('time must be a number; received %r' % time
)
1224 if namespace
is None:
1225 namespace
= namespace_manager
.get_namespace()
1226 return self
._memcache
_set
_batcher
.add((key
, value
),
1227 ('cas', time
, namespace
, deadline
))
1229 def memcache_delete(self
, key
, seconds
=0, namespace
=None, deadline
=None):
1230 if not isinstance(key
, basestring
):
1231 raise TypeError('key must be a string; received %r' % key
)
1232 if not isinstance(seconds
, (int, long)):
1233 raise TypeError('seconds must be a number; received %r' % seconds
)
1234 if namespace
is None:
1235 namespace
= namespace_manager
.get_namespace()
1236 return self
._memcache
_del
_batcher
.add(key
, (seconds
, namespace
, deadline
))
1238 def memcache_incr(self
, key
, delta
=1, initial_value
=None, namespace
=None,
1240 if not isinstance(key
, basestring
):
1241 raise TypeError('key must be a string; received %r' % key
)
1242 if not isinstance(delta
, (int, long)):
1243 raise TypeError('delta must be a number; received %r' % delta
)
1244 if initial_value
is not None and not isinstance(initial_value
, (int, long)):
1245 raise TypeError('initial_value must be a number or None; received %r' %
1247 if namespace
is None:
1248 namespace
= namespace_manager
.get_namespace()
1249 return self
._memcache
_off
_batcher
.add((key
, delta
),
1250 (initial_value
, namespace
, deadline
))
1252 def memcache_decr(self
, key
, delta
=1, initial_value
=None, namespace
=None,
1254 if not isinstance(key
, basestring
):
1255 raise TypeError('key must be a string; received %r' % key
)
1256 if not isinstance(delta
, (int, long)):
1257 raise TypeError('delta must be a number; received %r' % delta
)
1258 if initial_value
is not None and not isinstance(initial_value
, (int, long)):
1259 raise TypeError('initial_value must be a number or None; received %r' %
1261 if namespace
is None:
1262 namespace
= namespace_manager
.get_namespace()
1263 return self
._memcache
_off
_batcher
.add((key
, -delta
),
1264 (initial_value
, namespace
, deadline
))
1267 def urlfetch(self
, url
, payload
=None, method
='GET', headers
={},
1268 allow_truncated
=False, follow_redirects
=True,
1269 validate_certificate
=None, deadline
=None, callback
=None):
1270 rpc
= urlfetch
.create_rpc(deadline
=deadline
, callback
=callback
)
1271 urlfetch
.make_fetch_call(rpc
, url
,
1275 allow_truncated
=allow_truncated
,
1276 follow_redirects
=follow_redirects
,
1277 validate_certificate
=validate_certificate
)
1279 raise tasklets
.Return(result
)