App Engine Python SDK version 1.9.9
[gae.git] / python / google / appengine / api / datastore_file_stub.py
blob586a05e360b21b407d12f30c31cf626e3dd07773
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """
23 In-memory persistent stub for the Python datastore API. Gets, queries,
24 and searches are implemented as in-memory scans over all entities.
26 Stores entities across sessions as pickled proto bufs in a single file. On
27 startup, all entities are read from the file and loaded into memory. On
28 every Put(), the file is wiped and all entities are written from scratch.
29 Clients can also manually Read() and Write() the file themselves.
30 """
43 import collections
44 import logging
45 import os
46 import struct
47 import sys
48 import tempfile
49 import threading
50 import weakref
54 import cPickle as pickle
56 from google.appengine.api import apiproxy_stub
57 from google.appengine.api import datastore
58 from google.appengine.api import datastore_types
59 from google.appengine.datastore import datastore_pb
60 from google.appengine.datastore import datastore_stub_util
61 from google.appengine.runtime import apiproxy_errors
62 from google.net.proto import ProtocolBuffer
63 from google.appengine.datastore import entity_pb
67 datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
70 def _FinalElement(key):
71 """Return final element of a key's path."""
72 return key.path().element_list()[-1]
75 class _StoredEntity(object):
76 """Simple wrapper around an entity stored by the stub.
78 Public properties:
79 protobuf: Native protobuf Python object, entity_pb.EntityProto.
80 encoded_protobuf: Encoded binary representation of above protobuf.
81 """
83 def __init__(self, entity):
84 """Create a _StoredEntity object and store an entity.
86 Args:
87 entity: entity_pb.EntityProto to store.
88 """
89 self.protobuf = entity
96 self.encoded_protobuf = entity.Encode()
99 class KindPseudoKind(object):
100 """Pseudo-kind for schema queries.
102 Provides a Query method to perform the actual query.
104 Public properties:
105 name: the pseudo-kind name
107 name = '__kind__'
109 def Query(self, query, filters, orders):
110 """Perform a query on this pseudo-kind.
112 Args:
113 query: the original datastore_pb.Query.
114 filters: the filters from query.
115 orders: the orders from query.
117 Returns:
118 (results, remaining_filters, remaining_orders)
119 results is a list of entity_pb.EntityProto
120 remaining_filters and remaining_orders are the filters and orders that
121 should be applied in memory
123 kind_range = datastore_stub_util.ParseKindQuery(query, filters, orders)
124 app_namespace_str = datastore_types.EncodeAppIdNamespace(
125 query.app(), query.name_space())
126 kinds = []
129 for app_namespace, kind in self._stub._GetAllEntities():
130 if app_namespace != app_namespace_str: continue
131 kind = kind.decode('utf-8')
132 if not kind_range.Contains(kind): continue
133 kinds.append(datastore.Entity(self.name, name=kind, _app=query.app(),
134 namespace=query.name_space())._ToPb())
136 return (kinds, [], [])
139 class PropertyPseudoKind(object):
140 """Pseudo-kind for schema queries.
142 Provides a Query method to perform the actual query.
144 Public properties:
145 name: the pseudo-kind name
147 name = '__property__'
149 def Query(self, query, filters, orders):
150 """Perform a query on this pseudo-kind.
152 Args:
153 query: the original datastore_pb.Query.
154 filters: the filters from query.
155 orders: the orders from query.
157 Returns:
158 (results, remaining_filters, remaining_orders)
159 results is a list of entity_pb.EntityProto
160 remaining_filters and remaining_orders are the filters and orders that
161 should be applied in memory
163 property_range = datastore_stub_util.ParsePropertyQuery(query, filters,
164 orders)
165 keys_only = query.keys_only()
166 app_namespace_str = datastore_types.EncodeAppIdNamespace(
167 query.app(), query.name_space())
169 properties = []
170 if keys_only:
171 usekey = '__property__keys'
172 else:
173 usekey = '__property__'
175 entities = self._stub._GetAllEntities()
176 for app_namespace, kind in entities:
177 if app_namespace != app_namespace_str: continue
179 app_kind = (app_namespace_str, kind)
180 kind = kind.decode('utf-8')
184 (start_cmp, end_cmp) = property_range.MapExtremes(
185 lambda extreme, inclusive, is_end: cmp(kind, extreme[0]))
186 if not((start_cmp is None or start_cmp >= 0) and
187 (end_cmp is None or end_cmp <= 0)):
188 continue
191 kind_properties = self._stub._GetSchemaCache(app_kind, usekey)
192 if not kind_properties:
193 kind_properties = []
194 kind_key = datastore_types.Key.from_path(KindPseudoKind.name, kind,
195 _app=query.app(),
196 namespace=query.name_space())
198 props = collections.defaultdict(set)
202 for entity in entities[app_kind].values():
203 for prop in entity.protobuf.property_list():
204 prop_name = prop.name()
206 if (prop_name in
207 datastore_stub_util.GetInvisibleSpecialPropertyNames()):
208 continue
209 value_pb = prop.value()
210 props[prop_name].add(datastore_types.GetPropertyValueTag(value_pb))
213 for prop in sorted(props):
214 property_e = datastore.Entity(self.name, name=prop, parent=kind_key,
215 _app=query.app(),
216 namespace=query.name_space())
218 if not keys_only and props[prop]:
219 property_e['property_representation'] = [
220 datastore_stub_util._PROPERTY_TYPE_NAMES[tag]
221 for tag in sorted(props[prop])]
223 kind_properties.append(property_e._ToPb())
225 self._stub._SetSchemaCache(app_kind, usekey, kind_properties)
228 def InQuery(property_e):
229 return property_range.Contains(
230 (kind, _FinalElement(property_e.key()).name()))
231 properties += filter(InQuery, kind_properties)
233 return (properties, [], [])
236 class NamespacePseudoKind(object):
237 """Pseudo-kind for namespace queries.
239 Provides a Query method to perform the actual query.
241 Public properties:
242 name: the pseudo-kind name
244 name = '__namespace__'
246 def Query(self, query, filters, orders):
247 """Perform a query on this pseudo-kind.
249 Args:
250 query: the original datastore_pb.Query.
251 filters: the filters from query.
252 orders: the orders from query.
254 Returns:
255 (results, remaining_filters, remaining_orders)
256 results is a list of entity_pb.EntityProto
257 remaining_filters and remaining_orders are the filters and orders that
258 should be applied in memory
260 namespace_range = datastore_stub_util.ParseNamespaceQuery(query, filters,
261 orders)
262 app_str = query.app()
264 namespaces = set()
266 for app_namespace, _ in self._stub._GetAllEntities():
267 (app_id, namespace) = datastore_types.DecodeAppIdNamespace(app_namespace)
268 if app_id == app_str and namespace_range.Contains(namespace):
269 namespaces.add(namespace)
272 namespace_entities = []
273 for namespace in namespaces:
274 if namespace:
275 namespace_e = datastore.Entity(self.name, name=namespace,
276 _app=query.app())
277 else:
278 namespace_e = datastore.Entity(self.name,
279 id=datastore_types._EMPTY_NAMESPACE_ID,
280 _app=query.app())
281 namespace_entities.append(namespace_e._ToPb())
283 return (namespace_entities, [], [])
286 class DatastoreFileStub(datastore_stub_util.BaseDatastore,
287 apiproxy_stub.APIProxyStub,
288 datastore_stub_util.DatastoreStub):
289 """ Persistent stub for the Python datastore API.
291 Stores all entities in memory, and persists them to a file as pickled
292 protocol buffers. A DatastoreFileStub instance handles a single app's data
293 and is backed by files on disk.
296 def __init__(self,
297 app_id,
298 datastore_file,
299 history_file=None,
300 require_indexes=False,
301 service_name='datastore_v3',
302 trusted=False,
303 consistency_policy=None,
304 save_changes=True,
305 root_path=None,
306 use_atexit=True,
307 auto_id_policy=datastore_stub_util.SEQUENTIAL):
308 """Constructor.
310 Initializes and loads the datastore from the backing files, if they exist.
312 Args:
313 app_id: string
314 datastore_file: string, stores all entities across sessions. Use None
315 not to use a file.
316 history_file: DEPRECATED. No-op.
317 require_indexes: bool, default False. If True, composite indexes must
318 exist in index.yaml for queries that need them.
319 service_name: Service name expected for all calls.
320 trusted: bool, default False. If True, this stub allows an app to
321 access the data of another app.
322 consistency_policy: The consistency policy to use or None to use the
323 default. Consistency policies can be found in
324 datastore_stub_util.*ConsistencyPolicy
325 save_changes: bool, default True. If this stub should modify
326 datastore_file when entities are changed.
327 root_path: string, the root path of the app.
328 use_atexit: bool, indicates if the stub should save itself atexit.
329 auto_id_policy: enum, datastore_stub_util.SEQUENTIAL or .SCATTERED
336 self.__datastore_file = datastore_file
337 self.__save_changes = save_changes
346 self.__entities_by_kind = collections.defaultdict(dict)
347 self.__entities_by_group = collections.defaultdict(dict)
348 self.__entities_lock = threading.Lock()
353 self.__schema_cache = {}
355 self.__id_counters = {datastore_stub_util.SEQUENTIAL: 1L,
356 datastore_stub_util.SCATTERED: 1L
358 self.__id_lock = threading.Lock()
360 self.__file_lock = threading.Lock()
362 datastore_stub_util.BaseDatastore.__init__(
363 self, require_indexes, consistency_policy,
364 use_atexit and self.__IsSaveable(), auto_id_policy)
365 apiproxy_stub.APIProxyStub.__init__(self, service_name)
366 datastore_stub_util.DatastoreStub.__init__(self, weakref.proxy(self),
367 app_id, trusted, root_path)
370 self._RegisterPseudoKind(KindPseudoKind())
371 self._RegisterPseudoKind(PropertyPseudoKind())
372 self._RegisterPseudoKind(NamespacePseudoKind())
373 self._RegisterPseudoKind(datastore_stub_util.EntityGroupPseudoKind())
375 self.Read()
377 def Clear(self):
378 """ Clears the datastore by deleting all currently stored entities and
379 queries. """
380 self.__entities_lock.acquire()
381 try:
382 datastore_stub_util.BaseDatastore.Clear(self)
383 datastore_stub_util.DatastoreStub.Clear(self)
385 self.__entities_by_kind = collections.defaultdict(dict)
386 self.__entities_by_group = collections.defaultdict(dict)
387 self.__schema_cache = {}
388 finally:
389 self.__entities_lock.release()
391 def _GetAllEntities(self):
392 """Get all entities.
394 Returns:
395 Map from kind to _StoredEntity() list. Do not modify directly.
397 return self.__entities_by_kind
399 def _GetEntityLocation(self, key):
400 """Get keys to self.__entities_by_* from the given key.
402 Example usage:
403 app_kind, eg_k, k = self._GetEntityLocation(key)
404 self.__entities_by_kind[app_kind][k]
405 self.__entities_by_entity_group[eg_k][k]
407 Args:
408 key: entity_pb.Reference
410 Returns:
411 Tuple (by_kind key, by_entity_group key, entity key)
413 app_ns = datastore_types.EncodeAppIdNamespace(key.app(), key.name_space())
414 kind = _FinalElement(key).type()
415 entity_group = datastore_stub_util._GetEntityGroup(key)
416 eg_k = datastore_types.ReferenceToKeyValue(entity_group)
417 k = datastore_types.ReferenceToKeyValue(key)
419 return ((app_ns, kind), eg_k, k)
421 def _StoreEntity(self, entity, insert=False):
422 """ Store the given entity.
424 Any needed locking should be managed by the caller.
426 Args:
427 entity: The entity_pb.EntityProto to store.
428 insert: If we should check for existence.
430 app_kind, eg_k, k = self._GetEntityLocation(entity.key())
432 assert not insert or k not in self.__entities_by_kind[app_kind]
434 self.__entities_by_kind[app_kind][k] = _StoredEntity(entity)
435 self.__entities_by_group[eg_k][k] = entity
438 if app_kind in self.__schema_cache:
439 del self.__schema_cache[app_kind]
441 READ_PB_EXCEPTIONS = (ProtocolBuffer.ProtocolBufferDecodeError, LookupError,
442 TypeError, ValueError)
443 READ_ERROR_MSG = ('Data in %s is corrupt or a different version. '
444 'Try running with the --clear_datastore flag.\n%r')
445 READ_PY250_MSG = ('Are you using FloatProperty and/or GeoPtProperty? '
446 'Unfortunately loading float values from the datastore '
447 'file does not work with Python 2.5.0. '
448 'Please upgrade to a newer Python 2.5 release or use '
449 'the --clear_datastore flag.\n')
451 def Read(self):
452 """ Reads the datastore and history files into memory.
454 The in-memory query history is cleared, but the datastore is *not*
455 cleared; the entities in the files are merged into the entities in memory.
456 If you want them to overwrite the in-memory datastore, call Clear() before
457 calling Read().
459 If the datastore file contains an entity with the same app name, kind, and
460 key as an entity already in the datastore, the entity from the file
461 overwrites the entity in the datastore.
463 Also sets each ID counter to one greater than the highest ID allocated so
464 far in that counter's ID space.
466 if self.__datastore_file and self.__datastore_file != '/dev/null':
467 for encoded_entity in self.__ReadPickled(self.__datastore_file):
468 try:
469 entity = entity_pb.EntityProto(encoded_entity)
470 except self.READ_PB_EXCEPTIONS, e:
471 raise apiproxy_errors.ApplicationError(
472 datastore_pb.Error.INTERNAL_ERROR,
473 self.READ_ERROR_MSG % (self.__datastore_file, e))
474 except struct.error, e:
475 if (sys.version_info[0:3] == (2, 5, 0)
476 and e.message.startswith('unpack requires a string argument')):
479 raise apiproxy_errors.ApplicationError(
480 datastore_pb.Error.INTERNAL_ERROR,
481 self.READ_PY250_MSG + self.READ_ERROR_MSG %
482 (self.__datastore_file, e))
483 else:
484 raise
486 self._StoreEntity(entity)
488 last_path = _FinalElement(entity.key())
489 if last_path.id():
490 self._SetMaxId(last_path.id())
492 def Write(self):
493 """Writes out the datastore and history files.
495 Be careful! If the files already exist, this method overwrites them!
497 super(DatastoreFileStub, self).Write()
498 self.__WriteDatastore()
500 def __IsSaveable(self):
501 return (self.__datastore_file and self.__datastore_file != '/dev/null' and
502 self.__save_changes)
504 def __WriteDatastore(self):
505 """ Writes out the datastore file. Be careful! If the file already exists,
506 this method overwrites it!
508 if self.__IsSaveable():
509 encoded = []
510 for kind_dict in self.__entities_by_kind.values():
511 encoded.extend(entity.encoded_protobuf for entity in kind_dict.values())
513 self.__WritePickled(encoded, self.__datastore_file)
515 def __ReadPickled(self, filename):
516 """Reads a pickled object from the given file and returns it.
518 self.__file_lock.acquire()
520 try:
521 try:
522 if (filename and
523 filename != '/dev/null' and
524 os.path.isfile(filename) and
525 os.stat(filename).st_size > 0):
526 return pickle.load(open(filename, 'rb'))
527 else:
528 logging.warning('Could not read datastore data from %s', filename)
529 except (AttributeError, LookupError, ImportError, NameError, TypeError,
530 ValueError, struct.error, pickle.PickleError), e:
533 raise apiproxy_errors.ApplicationError(
534 datastore_pb.Error.INTERNAL_ERROR,
535 'Could not read data from %s. Try running with the '
536 '--clear_datastore flag. Cause:\n%r' % (filename, e))
537 finally:
538 self.__file_lock.release()
540 return []
542 def __WritePickled(self, obj, filename):
543 """Pickles the object and writes it to the given file.
545 if not filename or filename == '/dev/null' or not obj:
546 return
549 descriptor, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename))
550 tmpfile = os.fdopen(descriptor, 'wb')
556 pickler = pickle.Pickler(tmpfile, protocol=1)
557 pickler.fast = True
558 pickler.dump(obj)
560 tmpfile.close()
562 self.__file_lock.acquire()
563 try:
564 try:
566 os.rename(tmp_filename, filename)
567 except OSError:
569 try:
570 os.remove(filename)
571 except:
572 pass
573 os.rename(tmp_filename, filename)
574 finally:
575 self.__file_lock.release()
577 def MakeSyncCall(self, service, call, request, response, request_id=None):
578 """ The main RPC entry point. service must be 'datastore_v3'."""
579 self.assertPbIsInitialized(request)
580 super(DatastoreFileStub, self).MakeSyncCall(service,
581 call,
582 request,
583 response,
584 request_id)
585 self.assertPbIsInitialized(response)
587 def assertPbIsInitialized(self, pb):
588 """Raises an exception if the given PB is not initialized and valid."""
589 explanation = []
590 assert pb.IsInitialized(explanation), explanation
592 pb.Encode()
594 def _GetSchemaCache(self, kind, usekey):
595 if kind in self.__schema_cache and usekey in self.__schema_cache[kind]:
596 return self.__schema_cache[kind][usekey]
597 else:
598 return None
600 def _SetSchemaCache(self, kind, usekey, value):
601 if kind not in self.__schema_cache:
602 self.__schema_cache[kind] = {}
603 self.__schema_cache[kind][usekey] = value
607 def _Put(self, entity, insert):
608 entity = datastore_stub_util.StoreEntity(entity)
610 self.__entities_lock.acquire()
611 try:
612 self._StoreEntity(entity, insert)
613 finally:
614 self.__entities_lock.release()
616 def _Get(self, key):
617 app_kind, _, k = self._GetEntityLocation(key)
620 try:
621 return datastore_stub_util.LoadEntity(
622 self.__entities_by_kind[app_kind][k].protobuf)
623 except KeyError:
624 pass
626 def _Delete(self, key):
627 app_kind, eg_k, k = self._GetEntityLocation(key)
629 self.__entities_lock.acquire()
630 try:
631 del self.__entities_by_kind[app_kind][k]
632 del self.__entities_by_group[eg_k][k]
633 if not self.__entities_by_kind[app_kind]:
635 del self.__entities_by_kind[app_kind]
636 if not self.__entities_by_group[eg_k]:
637 del self.__entities_by_group[eg_k]
639 del self.__schema_cache[app_kind]
640 except KeyError:
642 pass
643 finally:
644 self.__entities_lock.release()
646 def _GetEntitiesInEntityGroup(self, entity_group):
647 eg_k = datastore_types.ReferenceToKeyValue(entity_group)
648 return self.__entities_by_group[eg_k].copy()
650 def _GetQueryCursor(self, query, filters, orders, index_list,
651 filter_predicate=None):
652 app_id = query.app()
653 namespace = query.name_space()
655 pseudo_kind = None
656 if query.has_kind() and query.kind() in self._pseudo_kinds:
657 pseudo_kind = self._pseudo_kinds[query.kind()]
662 self.__entities_lock.acquire()
663 try:
664 app_ns = datastore_types.EncodeAppIdNamespace(app_id, namespace)
665 if pseudo_kind:
667 (results, filters, orders) = pseudo_kind.Query(query, filters, orders)
668 elif query.has_kind():
669 results = [entity.protobuf for entity in
670 self.__entities_by_kind[app_ns, query.kind()].values()]
671 else:
672 results = []
673 for (cur_app_ns, _), entities in self.__entities_by_kind.iteritems():
674 if cur_app_ns == app_ns:
675 results.extend(entity.protobuf for entity in entities.itervalues())
676 except KeyError:
677 results = []
678 finally:
679 self.__entities_lock.release()
681 return datastore_stub_util._ExecuteQuery(results, query, filters, orders,
682 index_list, filter_predicate)
684 def _SetIdCounter(self, id_space, value):
685 """Set the ID counter for id_space to value."""
686 self.__id_counters[id_space] = value
688 def _IdCounter(self, id_space):
689 """Return current value of ID counter for id_space."""
690 return self.__id_counters[id_space]
692 def _SetMaxId(self, max_id):
693 """Infer ID space and advance corresponding counter."""
694 count, id_space = datastore_stub_util.IdToCounter(max_id)
695 if count >= self._IdCounter(id_space):
696 self._SetIdCounter(id_space, count + 1)
698 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
699 datastore_stub_util.Check(not (size and max_id),
700 'Both size and max cannot be set.')
702 self.__id_lock.acquire()
703 try:
704 id_space = datastore_stub_util.SEQUENTIAL
705 start = self._IdCounter(id_space)
706 if size:
707 datastore_stub_util.Check(size > 0, 'Size must be greater than 0.')
708 self._SetIdCounter(id_space, start + size)
709 elif max_id:
710 datastore_stub_util.Check(max_id >=0,
711 'Max must be greater than or equal to 0.')
712 self._SetIdCounter(id_space, max(start, max_id + 1))
713 end = self._IdCounter(id_space) - 1
714 finally:
715 self.__id_lock.release()
717 return (start, end)
719 def _AllocateIds(self, keys):
720 self.__id_lock.acquire()
721 full_keys = []
722 try:
723 for key in keys:
724 last_element = _FinalElement(key)
726 if last_element.id() or last_element.has_name():
727 for el in key.path().element_list():
728 if el.id():
729 self._SetMaxId(el.id())
731 else:
732 id_space = datastore_stub_util.SCATTERED
733 count = self._IdCounter(id_space)
734 last_element.set_id(datastore_stub_util.ToScatteredId(count))
735 self._SetIdCounter(id_space, count + 1)
736 full_keys.append(key)
737 finally:
738 self.__id_lock.release()
740 return full_keys