App Engine Python SDK version 1.9.3
[gae.git] / python / google / appengine / ext / ndb / context.py
blob9f5d12d9f5fd668a5b17d6ffb4c76e4007dafc3f
1 """Context class."""
3 import logging
4 import sys
6 from .google_imports import datastore # For taskqueue coordination
7 from .google_imports import datastore_errors
8 from .google_imports import memcache
9 from .google_imports import namespace_manager
10 from .google_imports import urlfetch
11 from .google_imports import datastore_rpc
12 from .google_imports import entity_pb
14 from .google_imports import ProtocolBuffer
16 from . import key as key_module
17 from . import model
18 from . import tasklets
19 from . import eventloop
20 from . import utils
22 __all__ = ['Context', 'ContextOptions', 'TransactionOptions', 'AutoBatcher',
23 'EVENTUAL_CONSISTENCY',
26 _LOCK_TIME = 32 # Time to lock out memcache.add() after datastore updates.
27 _LOCKED = 0 # Special value to store in memcache indicating locked value.
30 # Constant for read_policy.
31 EVENTUAL_CONSISTENCY = datastore_rpc.Configuration.EVENTUAL_CONSISTENCY
34 class ContextOptions(datastore_rpc.Configuration):
35 """Configuration options that may be passed along with get/put/delete."""
37 @datastore_rpc.ConfigOption
38 def use_cache(value):
39 if not isinstance(value, bool):
40 raise datastore_errors.BadArgumentError(
41 'use_cache should be a bool (%r)' % (value,))
42 return value
44 @datastore_rpc.ConfigOption
45 def use_memcache(value):
46 if not isinstance(value, bool):
47 raise datastore_errors.BadArgumentError(
48 'use_memcache should be a bool (%r)' % (value,))
49 return value
51 @datastore_rpc.ConfigOption
52 def use_datastore(value):
53 if not isinstance(value, bool):
54 raise datastore_errors.BadArgumentError(
55 'use_datastore should be a bool (%r)' % (value,))
56 return value
58 @datastore_rpc.ConfigOption
59 def memcache_timeout(value):
60 if not isinstance(value, (int, long)):
61 raise datastore_errors.BadArgumentError(
62 'memcache_timeout should be an integer (%r)' % (value,))
63 return value
65 @datastore_rpc.ConfigOption
66 def max_memcache_items(value):
67 if not isinstance(value, (int, long)):
68 raise datastore_errors.BadArgumentError(
69 'max_memcache_items should be an integer (%r)' % (value,))
70 return value
72 @datastore_rpc.ConfigOption
73 def memcache_deadline(value):
74 if not isinstance(value, (int, long)):
75 raise datastore_errors.BadArgumentError(
76 'memcache_deadline should be an integer (%r)' % (value,))
77 return value
79 class TransactionOptions(ContextOptions, datastore_rpc.TransactionOptions):
80 """Support both context options and transaction options."""
84 # options and config can be used interchangeably.
85 _OPTION_TRANSLATIONS = {
86 'options': 'config',
90 def _make_ctx_options(ctx_options, config_cls=ContextOptions):
91 """Helper to construct a ContextOptions object from keyword arguments.
93 Args:
94 ctx_options: A dict of keyword arguments.
95 config_cls: Optional Configuration class to use, default ContextOptions.
97 Note that either 'options' or 'config' can be used to pass another
98 Configuration object, but not both. If another Configuration
99 object is given it provides default values.
101 Returns:
102 A Configuration object, or None if ctx_options is empty.
104 if not ctx_options:
105 return None
106 for key in list(ctx_options):
107 translation = _OPTION_TRANSLATIONS.get(key)
108 if translation:
109 if translation in ctx_options:
110 raise ValueError('Cannot specify %s and %s at the same time' %
111 (key, translation))
112 ctx_options[translation] = ctx_options.pop(key)
113 return config_cls(**ctx_options)
116 class AutoBatcher(object):
117 """Batches multiple async calls if they share the same rpc options.
119 Here is an example to explain what this class does.
121 Life of a key.get_async(options) API call:
122 *) Key gets the singleton Context instance and invokes Context.get.
123 *) Context.get calls Context._get_batcher.add(key, options). This
124 returns a future "fut" as the return value of key.get_async.
125 At this moment, key.get_async returns.
127 *) When more than "limit" number of _get_batcher.add() was called,
128 _get_batcher invokes its self._todo_tasklet, Context._get_tasklet,
129 with the list of keys seen so far.
130 *) Context._get_tasklet fires a MultiRPC and waits on it.
131 *) Upon MultiRPC completion, Context._get_tasklet passes on the results
132 to the respective "fut" from key.get_async.
134 *) If user calls "fut".get_result() before "limit" number of add() was called,
135 "fut".get_result() will repeatedly call eventloop.run1().
136 *) After processing immediate callbacks, eventloop will run idlers.
137 AutoBatcher._on_idle is an idler.
138 *) _on_idle will run the "todo_tasklet" before the batch is full.
140 So the engine is todo_tasklet, which is a proxy tasklet that can combine
141 arguments into batches and passes along results back to respective futures.
142 This class is mainly a helper that invokes todo_tasklet with the right
143 arguments at the right time.
145 So simple and obvious.
148 def __init__(self, todo_tasklet, limit):
149 """Init.
151 Args:
152 todo_tasklet: the tasklet that actually fires RPC and waits on a MultiRPC.
153 It should take a list of (future, arg) pairs and an "options" as
154 arguments. "options" are rpc options.
155 limit: max number of items to batch for each distinct value of "options".
157 self._todo_tasklet = todo_tasklet
158 self._limit = limit
159 # A map from "options" to a list of (future, arg) tuple.
160 # future is the future return from a single async operations.
161 self._queues = {}
162 self._running = [] # A list of in-flight todo_tasklet futures.
163 self._cache = {} # Cache of in-flight todo_tasklet futures.
165 def __repr__(self):
166 return '%s(%s)' % (self.__class__.__name__, self._todo_tasklet.__name__)
168 def run_queue(self, options, todo):
169 """Actually run the _todo_tasklet."""
170 utils.logging_debug('AutoBatcher(%s): %d items',
171 self._todo_tasklet.__name__, len(todo))
172 batch_fut = self._todo_tasklet(todo, options)
173 self._running.append(batch_fut)
174 # Add a callback when we're done.
175 batch_fut.add_callback(self._finished_callback, batch_fut, todo)
177 def _on_idle(self):
178 """An idler eventloop can run.
180 Eventloop calls this when it has finished processing all immediate
181 callbacks. This method runs _todo_tasklet even before the batch is full.
183 if not self.action():
184 return None
185 return True
187 def add(self, arg, options=None):
188 """Adds an arg and gets back a future.
190 Args:
191 arg: one argument for _todo_tasklet.
192 options: rpc options.
194 Return:
195 An instance of future, representing the result of running
196 _todo_tasklet without batching.
198 fut = tasklets.Future('%s.add(%s, %s)' % (self, arg, options))
199 todo = self._queues.get(options)
200 if todo is None:
201 utils.logging_debug('AutoBatcher(%s): creating new queue for %r',
202 self._todo_tasklet.__name__, options)
203 if not self._queues:
204 eventloop.add_idle(self._on_idle)
205 todo = self._queues[options] = []
206 todo.append((fut, arg))
207 if len(todo) >= self._limit:
208 del self._queues[options]
209 self.run_queue(options, todo)
210 return fut
212 def add_once(self, arg, options=None):
213 cache_key = (arg, options)
214 fut = self._cache.get(cache_key)
215 if fut is None:
216 fut = self.add(arg, options)
217 self._cache[cache_key] = fut
218 fut.add_immediate_callback(self._cache.__delitem__, cache_key)
219 return fut
221 def action(self):
222 queues = self._queues
223 if not queues:
224 return False
225 options, todo = queues.popitem() # TODO: Should this use FIFO ordering?
226 self.run_queue(options, todo)
227 return True
229 def _finished_callback(self, batch_fut, todo):
230 """Passes exception along.
232 Args:
233 batch_fut: the batch future returned by running todo_tasklet.
234 todo: (fut, option) pair. fut is the future return by each add() call.
236 If the batch fut was successful, it has already called fut.set_result()
237 on other individual futs. This method only handles when the batch fut
238 encountered an exception.
240 self._running.remove(batch_fut)
241 err = batch_fut.get_exception()
242 if err is not None:
243 tb = batch_fut.get_traceback()
244 for (fut, _) in todo:
245 if not fut.done():
246 fut.set_exception(err, tb)
248 @tasklets.tasklet
249 def flush(self):
250 while self._running or self.action():
251 if self._running:
252 yield self._running # A list of Futures
255 class Context(object):
257 def __init__(self, conn=None, auto_batcher_class=AutoBatcher, config=None,
258 parent_context=None):
259 # NOTE: If conn is not None, config is only used to get the
260 # auto-batcher limits.
261 if conn is None:
262 conn = model.make_connection(config)
263 self._conn = conn
264 self._auto_batcher_class = auto_batcher_class
265 self._parent_context = parent_context # For transaction nesting.
266 # Get the get/put/delete limits (defaults 1000, 500, 500).
267 # Note that the explicit config passed in overrides the config
268 # attached to the connection, if it was passed in.
269 max_get = (datastore_rpc.Configuration.max_get_keys(config, conn.config) or
270 datastore_rpc.Connection.MAX_GET_KEYS)
271 max_put = (datastore_rpc.Configuration.max_put_entities(config,
272 conn.config) or
273 datastore_rpc.Connection.MAX_PUT_ENTITIES)
274 max_delete = (datastore_rpc.Configuration.max_delete_keys(config,
275 conn.config) or
276 datastore_rpc.Connection.MAX_DELETE_KEYS)
277 # Create the get/put/delete auto-batchers.
278 self._get_batcher = auto_batcher_class(self._get_tasklet, max_get)
279 self._put_batcher = auto_batcher_class(self._put_tasklet, max_put)
280 self._delete_batcher = auto_batcher_class(self._delete_tasklet, max_delete)
281 # We only have a single limit for memcache (default 1000).
282 max_memcache = (ContextOptions.max_memcache_items(config, conn.config) or
283 datastore_rpc.Connection.MAX_GET_KEYS)
284 # Create the memcache auto-batchers.
285 self._memcache_get_batcher = auto_batcher_class(self._memcache_get_tasklet,
286 max_memcache)
287 self._memcache_set_batcher = auto_batcher_class(self._memcache_set_tasklet,
288 max_memcache)
289 self._memcache_del_batcher = auto_batcher_class(self._memcache_del_tasklet,
290 max_memcache)
291 self._memcache_off_batcher = auto_batcher_class(self._memcache_off_tasklet,
292 max_memcache)
293 # Create a list of batchers for flush().
294 self._batchers = [self._get_batcher,
295 self._put_batcher,
296 self._delete_batcher,
297 self._memcache_get_batcher,
298 self._memcache_set_batcher,
299 self._memcache_del_batcher,
300 self._memcache_off_batcher,
302 self._cache = {}
303 self._memcache = memcache.Client()
304 self._on_commit_queue = []
306 # NOTE: The default memcache prefix is altered if an incompatible change is
307 # required. Remember to check release notes when using a custom prefix.
308 _memcache_prefix = 'NDB9:' # TODO: Might make this configurable.
310 @tasklets.tasklet
311 def flush(self):
312 # Rinse and repeat until all batchers are completely out of work.
313 more = True
314 while more:
315 yield [batcher.flush() for batcher in self._batchers]
316 more = False
317 for batcher in self._batchers:
318 if batcher._running or batcher._queues:
319 more = True
320 break
322 @tasklets.tasklet
323 def _get_tasklet(self, todo, options):
324 if not todo:
325 raise RuntimeError('Nothing to do.')
326 # Make the datastore RPC call.
327 datastore_keys = []
328 for unused_fut, key in todo:
329 datastore_keys.append(key)
330 # Now wait for the datastore RPC(s) and pass the results to the futures.
331 entities = yield self._conn.async_get(options, datastore_keys)
332 for ent, (fut, unused_key) in zip(entities, todo):
333 fut.set_result(ent)
335 @tasklets.tasklet
336 def _put_tasklet(self, todo, options):
337 if not todo:
338 raise RuntimeError('Nothing to do.')
339 # TODO: What if the same entity is being put twice?
340 # TODO: What if two entities with the same key are being put?
341 datastore_entities = []
342 for unused_fut, ent in todo:
343 datastore_entities.append(ent)
344 # Wait for datastore RPC(s).
345 keys = yield self._conn.async_put(options, datastore_entities)
346 for key, (fut, ent) in zip(keys, todo):
347 if key != ent._key:
348 if ent._has_complete_key():
349 raise datastore_errors.BadKeyError(
350 'Entity key differs from the one returned by the datastore. '
351 'Expected %r, got %r' % (key, ent._key))
352 ent._key = key
353 fut.set_result(key)
355 @tasklets.tasklet
356 def _delete_tasklet(self, todo, options):
357 if not todo:
358 raise RuntimeError('Nothing to do.')
359 futures = []
360 datastore_keys = []
361 for fut, key in todo:
362 futures.append(fut)
363 datastore_keys.append(key)
364 # Wait for datastore RPC(s).
365 yield self._conn.async_delete(options, datastore_keys)
366 # Send a dummy result to all original Futures.
367 for fut in futures:
368 fut.set_result(None)
370 # TODO: Unify the policy docstrings (they're getting too verbose).
372 # All the policy functions may also:
373 # - be a constant of the right type (instead of a function);
374 # - return None (instead of a value of the right type);
375 # - be None (instead of a function or constant).
377 # Model classes may define class variables or class methods
378 # _use_{cache,memcache,datastore} or _memcache_timeout to set the
379 # default policy of that type for that class.
381 @staticmethod
382 def default_cache_policy(key):
383 """Default cache policy.
385 This defers to _use_cache on the Model class.
387 Args:
388 key: Key instance.
390 Returns:
391 A bool or None.
393 flag = None
394 if key is not None:
395 modelclass = model.Model._kind_map.get(key.kind())
396 if modelclass is not None:
397 policy = getattr(modelclass, '_use_cache', None)
398 if policy is not None:
399 if isinstance(policy, bool):
400 flag = policy
401 else:
402 flag = policy(key)
403 return flag
405 _cache_policy = default_cache_policy
407 def get_cache_policy(self):
408 """Return the current context cache policy function.
410 Returns:
411 A function that accepts a Key instance as argument and returns
412 a bool indicating if it should be cached. May be None.
414 return self._cache_policy
416 def set_cache_policy(self, func):
417 """Set the context cache policy function.
419 Args:
420 func: A function that accepts a Key instance as argument and returns
421 a bool indicating if it should be cached. May be None.
423 if func is None:
424 func = self.default_cache_policy
425 elif isinstance(func, bool):
426 func = lambda unused_key, flag=func: flag
427 self._cache_policy = func
429 def _use_cache(self, key, options=None):
430 """Return whether to use the context cache for this key.
432 Args:
433 key: Key instance.
434 options: ContextOptions instance, or None.
436 Returns:
437 True if the key should be cached, False otherwise.
439 flag = ContextOptions.use_cache(options)
440 if flag is None:
441 flag = self._cache_policy(key)
442 if flag is None:
443 flag = ContextOptions.use_cache(self._conn.config)
444 if flag is None:
445 flag = True
446 return flag
448 @staticmethod
449 def default_memcache_policy(key):
450 """Default memcache policy.
452 This defers to _use_memcache on the Model class.
454 Args:
455 key: Key instance.
457 Returns:
458 A bool or None.
460 flag = None
461 if key is not None:
462 modelclass = model.Model._kind_map.get(key.kind())
463 if modelclass is not None:
464 policy = getattr(modelclass, '_use_memcache', None)
465 if policy is not None:
466 if isinstance(policy, bool):
467 flag = policy
468 else:
469 flag = policy(key)
470 return flag
472 _memcache_policy = default_memcache_policy
474 def get_memcache_policy(self):
475 """Return the current memcache policy function.
477 Returns:
478 A function that accepts a Key instance as argument and returns
479 a bool indicating if it should be cached. May be None.
481 return self._memcache_policy
483 def set_memcache_policy(self, func):
484 """Set the memcache policy function.
486 Args:
487 func: A function that accepts a Key instance as argument and returns
488 a bool indicating if it should be cached. May be None.
490 if func is None:
491 func = self.default_memcache_policy
492 elif isinstance(func, bool):
493 func = lambda unused_key, flag=func: flag
494 self._memcache_policy = func
496 def _use_memcache(self, key, options=None):
497 """Return whether to use memcache for this key.
499 Args:
500 key: Key instance.
501 options: ContextOptions instance, or None.
503 Returns:
504 True if the key should be cached in memcache, False otherwise.
506 flag = ContextOptions.use_memcache(options)
507 if flag is None:
508 flag = self._memcache_policy(key)
509 if flag is None:
510 flag = ContextOptions.use_memcache(self._conn.config)
511 if flag is None:
512 flag = True
513 return flag
515 @staticmethod
516 def default_datastore_policy(key):
517 """Default datastore policy.
519 This defers to _use_datastore on the Model class.
521 Args:
522 key: Key instance.
524 Returns:
525 A bool or None.
527 flag = None
528 if key is not None:
529 modelclass = model.Model._kind_map.get(key.kind())
530 if modelclass is not None:
531 policy = getattr(modelclass, '_use_datastore', None)
532 if policy is not None:
533 if isinstance(policy, bool):
534 flag = policy
535 else:
536 flag = policy(key)
537 return flag
539 _datastore_policy = default_datastore_policy
541 def get_datastore_policy(self):
542 """Return the current context datastore policy function.
544 Returns:
545 A function that accepts a Key instance as argument and returns
546 a bool indicating if it should use the datastore. May be None.
548 return self._datastore_policy
550 def set_datastore_policy(self, func):
551 """Set the context datastore policy function.
553 Args:
554 func: A function that accepts a Key instance as argument and returns
555 a bool indicating if it should use the datastore. May be None.
557 if func is None:
558 func = self.default_datastore_policy
559 elif isinstance(func, bool):
560 func = lambda unused_key, flag=func: flag
561 self._datastore_policy = func
563 def _use_datastore(self, key, options=None):
564 """Return whether to use the datastore for this key.
566 Args:
567 key: Key instance.
568 options: ContextOptions instance, or None.
570 Returns:
571 True if the datastore should be used, False otherwise.
573 flag = ContextOptions.use_datastore(options)
574 if flag is None:
575 flag = self._datastore_policy(key)
576 if flag is None:
577 flag = ContextOptions.use_datastore(self._conn.config)
578 if flag is None:
579 flag = True
580 return flag
582 @staticmethod
583 def default_memcache_timeout_policy(key):
584 """Default memcache timeout policy.
586 This defers to _memcache_timeout on the Model class.
588 Args:
589 key: Key instance.
591 Returns:
592 Memcache timeout to use (integer), or None.
594 timeout = None
595 if key is not None and isinstance(key, model.Key):
596 modelclass = model.Model._kind_map.get(key.kind())
597 if modelclass is not None:
598 policy = getattr(modelclass, '_memcache_timeout', None)
599 if policy is not None:
600 if isinstance(policy, (int, long)):
601 timeout = policy
602 else:
603 timeout = policy(key)
604 return timeout
606 _memcache_timeout_policy = default_memcache_timeout_policy
608 def set_memcache_timeout_policy(self, func):
609 """Set the policy function for memcache timeout (expiration).
611 Args:
612 func: A function that accepts a key instance as argument and returns
613 an integer indicating the desired memcache timeout. May be None.
615 If the function returns 0 it implies the default timeout.
617 if func is None:
618 func = self.default_memcache_timeout_policy
619 elif isinstance(func, (int, long)):
620 func = lambda unused_key, flag=func: flag
621 self._memcache_timeout_policy = func
623 def get_memcache_timeout_policy(self):
624 """Return the current policy function for memcache timeout (expiration)."""
625 return self._memcache_timeout_policy
627 def _get_memcache_timeout(self, key, options=None):
628 """Return the memcache timeout (expiration) for this key."""
629 timeout = ContextOptions.memcache_timeout(options)
630 if timeout is None:
631 timeout = self._memcache_timeout_policy(key)
632 if timeout is None:
633 timeout = ContextOptions.memcache_timeout(self._conn.config)
634 if timeout is None:
635 timeout = 0
636 return timeout
638 def _get_memcache_deadline(self, options=None):
639 """Return the memcache RPC deadline.
641 Not to be confused with the memcache timeout, or expiration.
643 This is only used by datastore operations when using memcache
644 as a cache; it is ignored by the direct memcache calls.
646 There is no way to vary this per key or per entity; you must either
647 set it on a specific call (e.g. key.get(memcache_deadline=1) or
648 in the configuration options of the context's connection.
650 # If this returns None, the system default (typically, 5) will apply.
651 return ContextOptions.memcache_deadline(options, self._conn.config)
654 def _load_from_cache_if_available(self, key):
655 """Returns a cached Model instance given the entity key if available.
657 Args:
658 key: Key instance.
660 Returns:
661 A Model instance if the key exists in the cache.
663 if key in self._cache:
664 entity = self._cache[key] # May be None, meaning "doesn't exist".
665 if entity is None or entity._key == key:
666 # If entity's key didn't change later, it is ok.
667 # See issue 13. http://goo.gl/jxjOP
668 raise tasklets.Return(entity)
670 # TODO: What about conflicting requests to different autobatchers,
671 # e.g. tasklet A calls get() on a given key while tasklet B calls
672 # delete()? The outcome is nondeterministic, depending on which
673 # autobatcher gets run first. Maybe we should just flag such
674 # conflicts as errors, with an overridable policy to resolve them
675 # differently?
677 @tasklets.tasklet
678 def get(self, key, **ctx_options):
679 """Return a Model instance given the entity key.
681 It will use the context cache if the cache policy for the given
682 key is enabled.
684 Args:
685 key: Key instance.
686 **ctx_options: Context options.
688 Returns:
689 A Model instance if the key exists in the datastore; None otherwise.
691 options = _make_ctx_options(ctx_options)
692 use_cache = self._use_cache(key, options)
693 if use_cache:
694 self._load_from_cache_if_available(key)
696 use_datastore = self._use_datastore(key, options)
697 if (use_datastore and
698 isinstance(self._conn, datastore_rpc.TransactionalConnection)):
699 use_memcache = False
700 else:
701 use_memcache = self._use_memcache(key, options)
702 ns = key.namespace()
703 memcache_deadline = None # Avoid worries about uninitialized variable.
705 if use_memcache:
706 mkey = self._memcache_prefix + key.urlsafe()
707 memcache_deadline = self._get_memcache_deadline(options)
708 mvalue = yield self.memcache_get(mkey, for_cas=use_datastore,
709 namespace=ns, use_cache=True,
710 deadline=memcache_deadline)
711 # A value may have appeared while yielding.
712 if use_cache:
713 self._load_from_cache_if_available(key)
714 if mvalue not in (_LOCKED, None):
715 cls = model.Model._lookup_model(key.kind(),
716 self._conn.adapter.default_model)
717 pb = entity_pb.EntityProto()
719 try:
720 pb.MergePartialFromString(mvalue)
721 except ProtocolBuffer.ProtocolBufferDecodeError:
722 logging.warning('Corrupt memcache entry found '
723 'with key %s and namespace %s' % (mkey, ns))
724 mvalue = None
725 else:
726 entity = cls._from_pb(pb)
727 # Store the key on the entity since it wasn't written to memcache.
728 entity._key = key
729 if use_cache:
730 # Update in-memory cache.
731 self._cache[key] = entity
732 raise tasklets.Return(entity)
734 if mvalue is None and use_datastore:
735 yield self.memcache_set(mkey, _LOCKED, time=_LOCK_TIME, namespace=ns,
736 use_cache=True, deadline=memcache_deadline)
737 yield self.memcache_gets(mkey, namespace=ns, use_cache=True,
738 deadline=memcache_deadline)
740 if not use_datastore:
741 # NOTE: Do not cache this miss. In some scenarios this would
742 # prevent an app from working properly.
743 raise tasklets.Return(None)
745 if use_cache:
746 entity = yield self._get_batcher.add_once(key, options)
747 else:
748 entity = yield self._get_batcher.add(key, options)
750 if entity is not None:
751 if use_memcache and mvalue != _LOCKED:
752 # Don't serialize the key since it's already the memcache key.
753 pbs = entity._to_pb(set_key=False).SerializePartialToString()
754 # Don't attempt to write to memcache if too big. Note that we
755 # use LBYL ("look before you leap") because a multi-value
756 # memcache operation would fail for all entities rather than
757 # for just the one that's too big. (Also, the AutoBatcher
758 # class doesn't pass back exceptions very well.)
759 if len(pbs) <= memcache.MAX_VALUE_SIZE:
760 timeout = self._get_memcache_timeout(key, options)
761 # Don't use fire-and-forget -- for users who forget
762 # @ndb.toplevel, it's too painful to diagnose why their simple
763 # code using a single synchronous call doesn't seem to use
764 # memcache. See issue 105. http://goo.gl/JQZxp
765 yield self.memcache_cas(mkey, pbs, time=timeout, namespace=ns,
766 deadline=memcache_deadline)
768 if use_cache:
769 # Cache hit or miss. NOTE: In this case it is okay to cache a
770 # miss; the datastore is the ultimate authority.
771 self._cache[key] = entity
773 raise tasklets.Return(entity)
775 @tasklets.tasklet
776 def put(self, entity, **ctx_options):
777 options = _make_ctx_options(ctx_options)
778 # TODO: What if the same entity is being put twice?
779 # TODO: What if two entities with the same key are being put?
780 key = entity._key
781 if key is None:
782 # Pass a dummy Key to _use_datastore().
783 key = model.Key(entity.__class__, None)
784 use_datastore = self._use_datastore(key, options)
785 use_memcache = None
786 memcache_deadline = None # Avoid worries about uninitialized variable.
788 if entity._has_complete_key():
789 use_memcache = self._use_memcache(key, options)
790 if use_memcache:
791 # Wait for memcache operations before starting datastore RPCs.
792 memcache_deadline = self._get_memcache_deadline(options)
793 mkey = self._memcache_prefix + key.urlsafe()
794 ns = key.namespace()
795 if use_datastore:
796 yield self.memcache_set(mkey, _LOCKED, time=_LOCK_TIME,
797 namespace=ns, use_cache=True,
798 deadline=memcache_deadline)
799 else:
800 pbs = entity._to_pb(set_key=False).SerializePartialToString()
801 # If the byte string to be written is too long for memcache,
802 # raise ValueError. (See LBYL explanation in get().)
803 if len(pbs) > memcache.MAX_VALUE_SIZE:
804 raise ValueError('Values may not be more than %d bytes in length; '
805 'received %d bytes' % (memcache.MAX_VALUE_SIZE,
806 len(pbs)))
807 timeout = self._get_memcache_timeout(key, options)
808 yield self.memcache_set(mkey, pbs, time=timeout, namespace=ns,
809 deadline=memcache_deadline)
811 if use_datastore:
812 key = yield self._put_batcher.add(entity, options)
813 if not isinstance(self._conn, datastore_rpc.TransactionalConnection):
814 if use_memcache is None:
815 use_memcache = self._use_memcache(key, options)
816 if use_memcache:
817 mkey = self._memcache_prefix + key.urlsafe()
818 ns = key.namespace()
819 # Don't use fire-and-forget -- see memcache_cas() in get().
820 yield self.memcache_delete(mkey, namespace=ns,
821 deadline=memcache_deadline)
823 if key is not None:
824 if entity._key != key:
825 logging.info('replacing key %s with %s', entity._key, key)
826 entity._key = key
827 # TODO: For updated entities, could we update the cache first?
828 if self._use_cache(key, options):
829 # TODO: What if by now the entity is already in the cache?
830 self._cache[key] = entity
832 raise tasklets.Return(key)
834 @tasklets.tasklet
835 def delete(self, key, **ctx_options):
836 options = _make_ctx_options(ctx_options)
837 if self._use_memcache(key, options):
838 memcache_deadline = self._get_memcache_deadline(options)
839 mkey = self._memcache_prefix + key.urlsafe()
840 ns = key.namespace()
841 # TODO: If not use_datastore, delete instead of lock?
842 yield self.memcache_set(mkey, _LOCKED, time=_LOCK_TIME, namespace=ns,
843 use_cache=True, deadline=memcache_deadline)
845 if self._use_datastore(key, options):
846 yield self._delete_batcher.add(key, options)
847 # TODO: Delete from memcache here?
849 if self._use_cache(key, options):
850 self._cache[key] = None
852 @tasklets.tasklet
853 def allocate_ids(self, key, size=None, max=None, **ctx_options):
854 options = _make_ctx_options(ctx_options)
855 lo_hi = yield self._conn.async_allocate_ids(options, key, size, max)
856 raise tasklets.Return(lo_hi)
858 @tasklets.tasklet
859 def get_indexes(self, **ctx_options):
860 options = _make_ctx_options(ctx_options)
861 index_list = yield self._conn.async_get_indexes(options)
862 raise tasklets.Return(index_list)
864 @utils.positional(3)
865 def map_query(self, query, callback, pass_batch_into_callback=None,
866 options=None, merge_future=None):
867 mfut = merge_future
868 if mfut is None:
869 mfut = tasklets.MultiFuture('map_query')
871 @tasklets.tasklet
872 def helper():
873 try:
874 inq = tasklets.SerialQueueFuture()
875 query.run_to_queue(inq, self._conn, options)
876 while True:
877 try:
878 batch, i, ent = yield inq.getq()
879 except EOFError:
880 break
881 ent = self._update_cache_from_query_result(ent, options)
882 if ent is None:
883 continue
884 if callback is None:
885 val = ent
886 else:
887 # TODO: If the callback raises, log and ignore.
888 if pass_batch_into_callback:
889 val = callback(batch, i, ent)
890 else:
891 val = callback(ent)
892 mfut.putq(val)
893 except GeneratorExit:
894 raise
895 except Exception, err:
896 _, _, tb = sys.exc_info()
897 mfut.set_exception(err, tb)
898 raise
899 else:
900 mfut.complete()
902 helper()
903 return mfut
905 def _update_cache_from_query_result(self, ent, options):
906 if isinstance(ent, model.Key):
907 return ent # It was a keys-only query and ent is really a Key.
908 if ent._projection:
909 return ent # Never cache partial entities (projection query results).
910 key = ent._key
911 if not self._use_cache(key, options):
912 return ent # This key should not be cached.
914 # Check the cache. If there is a valid cached entry, substitute
915 # that for the result, even if the cache has an explicit None.
916 if key in self._cache:
917 cached_ent = self._cache[key]
918 if (cached_ent is None or
919 cached_ent.key == key and cached_ent.__class__ is ent.__class__):
920 return cached_ent
922 # Update the cache.
923 self._cache[key] = ent
924 return ent
926 @utils.positional(2)
927 def iter_query(self, query, callback=None, pass_batch_into_callback=None,
928 options=None):
929 return self.map_query(query, callback=callback, options=options,
930 pass_batch_into_callback=pass_batch_into_callback,
931 merge_future=tasklets.SerialQueueFuture())
933 @tasklets.tasklet
934 def transaction(self, callback, **ctx_options):
935 # Will invoke callback() one or more times with the default
936 # context set to a new, transactional Context. Returns a Future.
937 # Callback may be a tasklet; in that case it will be waited on.
938 options = _make_ctx_options(ctx_options, TransactionOptions)
939 propagation = TransactionOptions.propagation(options)
940 if propagation is None:
941 propagation = TransactionOptions.NESTED
943 parent = self
944 if propagation == TransactionOptions.NESTED:
945 if self.in_transaction():
946 raise datastore_errors.BadRequestError(
947 'Nested transactions are not supported.')
948 elif propagation == TransactionOptions.MANDATORY:
949 if not self.in_transaction():
950 raise datastore_errors.BadRequestError(
951 'Requires an existing transaction.')
952 result = callback()
953 if isinstance(result, tasklets.Future):
954 result = yield result
955 raise tasklets.Return(result)
956 elif propagation == TransactionOptions.ALLOWED:
957 if self.in_transaction():
958 result = callback()
959 if isinstance(result, tasklets.Future):
960 result = yield result
961 raise tasklets.Return(result)
962 elif propagation == TransactionOptions.INDEPENDENT:
963 while parent.in_transaction():
964 parent = parent._parent_context
965 if parent is None:
966 raise datastore_errors.BadRequestError(
967 'Context without non-transactional ancestor')
968 else:
969 raise datastore_errors.BadArgumentError(
970 'Invalid propagation value (%s).' % (propagation,))
972 app = TransactionOptions.app(options) or key_module._DefaultAppId()
973 # Note: zero retries means try it once.
974 retries = TransactionOptions.retries(options)
975 if retries is None:
976 retries = 3
977 yield parent.flush()
978 for _ in xrange(1 + max(0, retries)):
979 transaction = yield parent._conn.async_begin_transaction(options, app)
980 tconn = datastore_rpc.TransactionalConnection(
981 adapter=parent._conn.adapter,
982 config=parent._conn.config,
983 transaction=transaction)
984 tctx = parent.__class__(conn=tconn,
985 auto_batcher_class=parent._auto_batcher_class,
986 parent_context=parent)
987 tctx._old_ds_conn = datastore._GetConnection()
988 ok = False
989 try:
990 # Copy memcache policies. Note that get() will never use
991 # memcache in a transaction, but put and delete should do their
992 # memcache thing (which is to mark the key as deleted for
993 # _LOCK_TIME seconds). Also note that the in-process cache and
994 # datastore policies keep their default (on) state.
995 tctx.set_memcache_policy(parent.get_memcache_policy())
996 tctx.set_memcache_timeout_policy(parent.get_memcache_timeout_policy())
997 tasklets.set_context(tctx)
998 datastore._SetConnection(tconn) # For taskqueue coordination
999 try:
1000 try:
1001 result = callback()
1002 if isinstance(result, tasklets.Future):
1003 result = yield result
1004 finally:
1005 yield tctx.flush()
1006 except GeneratorExit:
1007 raise
1008 except Exception:
1009 t, e, tb = sys.exc_info()
1010 tconn.async_rollback(options) # Fire and forget.
1011 if issubclass(t, datastore_errors.Rollback):
1012 # TODO: Raise value using tasklets.get_return_value(t)?
1013 return
1014 else:
1015 raise t, e, tb
1016 else:
1017 ok = yield tconn.async_commit(options)
1018 if ok:
1019 parent._cache.update(tctx._cache)
1020 yield parent._clear_memcache(tctx._cache)
1021 raise tasklets.Return(result)
1022 # The finally clause will run the on-commit queue.
1023 finally:
1024 datastore._SetConnection(tctx._old_ds_conn)
1025 del tctx._old_ds_conn
1026 if ok:
1027 # Call the callbacks collected in the transaction context's
1028 # on-commit queue. If the transaction failed the queue is
1029 # abandoned. We must do this after the connection has been
1030 # restored, but we can't do it after the for-loop because we
1031 # leave it by raising tasklets.Return().
1032 for on_commit_callback in tctx._on_commit_queue:
1033 on_commit_callback() # This better not raise.
1035 # Out of retries
1036 raise datastore_errors.TransactionFailedError(
1037 'The transaction could not be committed. Please try again.')
1039 def in_transaction(self):
1040 """Return whether a transaction is currently active."""
1041 return isinstance(self._conn, datastore_rpc.TransactionalConnection)
1043 def call_on_commit(self, callback):
1044 """Call a callback upon successful commit of a transaction.
1046 If not in a transaction, the callback is called immediately.
1048 In a transaction, multiple callbacks may be registered and will be
1049 called once the transaction commits, in the order in which they
1050 were registered. If the transaction fails, the callbacks will not
1051 be called.
1053 If the callback raises an exception, it bubbles up normally. This
1054 means: If the callback is called immediately, any exception it
1055 raises will bubble up immediately. If the call is postponed until
1056 commit, remaining callbacks will be skipped and the exception will
1057 bubble up through the transaction() call. (However, the
1058 transaction is already committed at that point.)
1060 if not self.in_transaction():
1061 callback()
1062 else:
1063 self._on_commit_queue.append(callback)
1065 def clear_cache(self):
1066 """Clears the in-memory cache.
1068 NOTE: This does not affect memcache.
1070 self._cache.clear()
1072 @tasklets.tasklet
1073 def _clear_memcache(self, keys):
1074 keys = set(key for key in keys if self._use_memcache(key))
1075 futures = []
1076 for key in keys:
1077 mkey = self._memcache_prefix + key.urlsafe()
1078 ns = key.namespace()
1079 fut = self.memcache_delete(mkey, namespace=ns)
1080 futures.append(fut)
1081 yield futures
1083 @tasklets.tasklet
1084 def _memcache_get_tasklet(self, todo, options):
1085 if not todo:
1086 raise RuntimeError('Nothing to do.')
1087 for_cas, namespace, deadline = options
1088 keys = set()
1089 for unused_fut, key in todo:
1090 keys.add(key)
1091 rpc = memcache.create_rpc(deadline=deadline)
1092 results = yield self._memcache.get_multi_async(keys, for_cas=for_cas,
1093 namespace=namespace,
1094 rpc=rpc)
1095 for fut, key in todo:
1096 fut.set_result(results.get(key))
1098 @tasklets.tasklet
1099 def _memcache_set_tasklet(self, todo, options):
1100 if not todo:
1101 raise RuntimeError('Nothing to do.')
1102 opname, time, namespace, deadline = options
1103 methodname = opname + '_multi_async'
1104 method = getattr(self._memcache, methodname)
1105 mapping = {}
1106 for unused_fut, (key, value) in todo:
1107 mapping[key] = value
1108 rpc = memcache.create_rpc(deadline=deadline)
1109 results = yield method(mapping, time=time, namespace=namespace, rpc=rpc)
1110 for fut, (key, unused_value) in todo:
1111 if results is None:
1112 status = memcache.MemcacheSetResponse.ERROR
1113 else:
1114 status = results.get(key)
1115 fut.set_result(status == memcache.MemcacheSetResponse.STORED)
1117 @tasklets.tasklet
1118 def _memcache_del_tasklet(self, todo, options):
1119 if not todo:
1120 raise RuntimeError('Nothing to do.')
1121 seconds, namespace, deadline = options
1122 keys = set()
1123 for unused_fut, key in todo:
1124 keys.add(key)
1125 rpc = memcache.create_rpc(deadline=deadline)
1126 statuses = yield self._memcache.delete_multi_async(keys, seconds=seconds,
1127 namespace=namespace,
1128 rpc=rpc)
1129 status_key_mapping = {}
1130 if statuses: # On network error, statuses is None.
1131 for key, status in zip(keys, statuses):
1132 status_key_mapping[key] = status
1133 for fut, key in todo:
1134 status = status_key_mapping.get(key, memcache.DELETE_NETWORK_FAILURE)
1135 fut.set_result(status)
1137 @tasklets.tasklet
1138 def _memcache_off_tasklet(self, todo, options):
1139 if not todo:
1140 raise RuntimeError('Nothing to do.')
1141 initial_value, namespace, deadline = options
1142 mapping = {} # {key: delta}
1143 for unused_fut, (key, delta) in todo:
1144 mapping[key] = delta
1145 rpc = memcache.create_rpc(deadline=deadline)
1146 results = yield self._memcache.offset_multi_async(
1147 mapping, initial_value=initial_value, namespace=namespace, rpc=rpc)
1148 for fut, (key, unused_delta) in todo:
1149 fut.set_result(results.get(key))
1151 def memcache_get(self, key, for_cas=False, namespace=None, use_cache=False,
1152 deadline=None):
1153 """An auto-batching wrapper for memcache.get() or .get_multi().
1155 Args:
1156 key: Key to set. This must be a string; no prefix is applied.
1157 for_cas: If True, request and store CAS ids on the Context.
1158 namespace: Optional namespace.
1159 deadline: Optional deadline for the RPC.
1161 Returns:
1162 A Future (!) whose return value is the value retrieved from
1163 memcache, or None.
1165 if not isinstance(key, basestring):
1166 raise TypeError('key must be a string; received %r' % key)
1167 if not isinstance(for_cas, bool):
1168 raise TypeError('for_cas must be a bool; received %r' % for_cas)
1169 if namespace is None:
1170 namespace = namespace_manager.get_namespace()
1171 options = (for_cas, namespace, deadline)
1172 batcher = self._memcache_get_batcher
1173 if use_cache:
1174 return batcher.add_once(key, options)
1175 else:
1176 return batcher.add(key, options)
1178 # XXX: Docstrings below.
1180 def memcache_gets(self, key, namespace=None, use_cache=False, deadline=None):
1181 return self.memcache_get(key, for_cas=True, namespace=namespace,
1182 use_cache=use_cache, deadline=deadline)
1184 def memcache_set(self, key, value, time=0, namespace=None, use_cache=False,
1185 deadline=None):
1186 if not isinstance(key, basestring):
1187 raise TypeError('key must be a string; received %r' % key)
1188 if not isinstance(time, (int, long)):
1189 raise TypeError('time must be a number; received %r' % time)
1190 if namespace is None:
1191 namespace = namespace_manager.get_namespace()
1192 options = ('set', time, namespace, deadline)
1193 batcher = self._memcache_set_batcher
1194 if use_cache:
1195 return batcher.add_once((key, value), options)
1196 else:
1197 return batcher.add((key, value), options)
1199 def memcache_add(self, key, value, time=0, namespace=None, deadline=None):
1200 if not isinstance(key, basestring):
1201 raise TypeError('key must be a string; received %r' % key)
1202 if not isinstance(time, (int, long)):
1203 raise TypeError('time must be a number; received %r' % time)
1204 if namespace is None:
1205 namespace = namespace_manager.get_namespace()
1206 return self._memcache_set_batcher.add((key, value),
1207 ('add', time, namespace, deadline))
1209 def memcache_replace(self, key, value, time=0, namespace=None, deadline=None):
1210 if not isinstance(key, basestring):
1211 raise TypeError('key must be a string; received %r' % key)
1212 if not isinstance(time, (int, long)):
1213 raise TypeError('time must be a number; received %r' % time)
1214 if namespace is None:
1215 namespace = namespace_manager.get_namespace()
1216 options = ('replace', time, namespace, deadline)
1217 return self._memcache_set_batcher.add((key, value), options)
1219 def memcache_cas(self, key, value, time=0, namespace=None, deadline=None):
1220 if not isinstance(key, basestring):
1221 raise TypeError('key must be a string; received %r' % key)
1222 if not isinstance(time, (int, long)):
1223 raise TypeError('time must be a number; received %r' % time)
1224 if namespace is None:
1225 namespace = namespace_manager.get_namespace()
1226 return self._memcache_set_batcher.add((key, value),
1227 ('cas', time, namespace, deadline))
1229 def memcache_delete(self, key, seconds=0, namespace=None, deadline=None):
1230 if not isinstance(key, basestring):
1231 raise TypeError('key must be a string; received %r' % key)
1232 if not isinstance(seconds, (int, long)):
1233 raise TypeError('seconds must be a number; received %r' % seconds)
1234 if namespace is None:
1235 namespace = namespace_manager.get_namespace()
1236 return self._memcache_del_batcher.add(key, (seconds, namespace, deadline))
1238 def memcache_incr(self, key, delta=1, initial_value=None, namespace=None,
1239 deadline=None):
1240 if not isinstance(key, basestring):
1241 raise TypeError('key must be a string; received %r' % key)
1242 if not isinstance(delta, (int, long)):
1243 raise TypeError('delta must be a number; received %r' % delta)
1244 if initial_value is not None and not isinstance(initial_value, (int, long)):
1245 raise TypeError('initial_value must be a number or None; received %r' %
1246 initial_value)
1247 if namespace is None:
1248 namespace = namespace_manager.get_namespace()
1249 return self._memcache_off_batcher.add((key, delta),
1250 (initial_value, namespace, deadline))
1252 def memcache_decr(self, key, delta=1, initial_value=None, namespace=None,
1253 deadline=None):
1254 if not isinstance(key, basestring):
1255 raise TypeError('key must be a string; received %r' % key)
1256 if not isinstance(delta, (int, long)):
1257 raise TypeError('delta must be a number; received %r' % delta)
1258 if initial_value is not None and not isinstance(initial_value, (int, long)):
1259 raise TypeError('initial_value must be a number or None; received %r' %
1260 initial_value)
1261 if namespace is None:
1262 namespace = namespace_manager.get_namespace()
1263 return self._memcache_off_batcher.add((key, -delta),
1264 (initial_value, namespace, deadline))
1266 @tasklets.tasklet
1267 def urlfetch(self, url, payload=None, method='GET', headers={},
1268 allow_truncated=False, follow_redirects=True,
1269 validate_certificate=None, deadline=None, callback=None):
1270 rpc = urlfetch.create_rpc(deadline=deadline, callback=callback)
1271 urlfetch.make_fetch_call(rpc, url,
1272 payload=payload,
1273 method=method,
1274 headers=headers,
1275 allow_truncated=allow_truncated,
1276 follow_redirects=follow_redirects,
1277 validate_certificate=validate_certificate)
1278 result = yield rpc
1279 raise tasklets.Return(result)