1.9.30 sync.
[gae.git] / python / google / appengine / api / datastore_file_stub.py
blob1e326a16a78e90c40128ae264056bc6001a511c4
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """
23 In-memory persistent stub for the Python datastore API. Gets, queries,
24 and searches are implemented as in-memory scans over all entities.
26 Stores entities across sessions as pickled proto bufs in a single file. On
27 startup, all entities are read from the file and loaded into memory. On
28 every Put(), the file is wiped and all entities are written from scratch.
29 Clients can also manually Read() and Write() the file themselves.
30 """
44 import collections
45 import logging
46 import os
47 import struct
48 import sys
49 import tempfile
50 import threading
51 import weakref
55 import cPickle as pickle
57 from google.appengine.api import apiproxy_stub
58 from google.appengine.api import datastore
59 from google.appengine.api import datastore_types
60 from google.appengine.datastore import datastore_pb
61 from google.appengine.datastore import datastore_stub_util
62 from google.appengine.runtime import apiproxy_errors
63 from google.net.proto import ProtocolBuffer
64 from google.appengine.datastore import entity_pb
68 datastore_pb.Query.__hash__ = lambda self: hash(self.Encode())
71 def _FinalElement(key):
72 """Return final element of a key's path."""
73 return key.path().element_list()[-1]
76 class _StoredEntity(object):
77 """Simple wrapper around an entity stored by the stub.
79 Public properties:
80 protobuf: Native protobuf Python object, entity_pb.EntityProto.
81 encoded_protobuf: Encoded binary representation of above protobuf.
82 """
84 def __init__(self, entity):
85 """Create a _StoredEntity object and store an entity.
87 Args:
88 entity: entity_pb.EntityProto to store.
89 """
90 self.protobuf = entity
97 self.encoded_protobuf = entity.Encode()
100 class KindPseudoKind(object):
101 """Pseudo-kind for schema queries.
103 Provides a Query method to perform the actual query.
105 Public properties:
106 name: the pseudo-kind name
108 name = '__kind__'
110 def Query(self, query, filters, orders):
111 """Perform a query on this pseudo-kind.
113 Args:
114 query: the original datastore_pb.Query.
115 filters: the filters from query.
116 orders: the orders from query.
118 Returns:
119 (results, remaining_filters, remaining_orders)
120 results is a list of entity_pb.EntityProto
121 remaining_filters and remaining_orders are the filters and orders that
122 should be applied in memory
124 kind_range = datastore_stub_util.ParseKindQuery(query, filters, orders)
125 app_namespace_str = datastore_types.EncodeAppIdNamespace(
126 query.app(), query.name_space())
127 kinds = []
130 for app_namespace, kind in self._stub._GetAllEntities():
131 if app_namespace != app_namespace_str: continue
132 kind = kind.decode('utf-8')
133 if not kind_range.Contains(kind): continue
134 kinds.append(datastore.Entity(self.name, name=kind, _app=query.app(),
135 namespace=query.name_space())._ToPb())
137 return (kinds, [], [])
140 class PropertyPseudoKind(object):
141 """Pseudo-kind for schema queries.
143 Provides a Query method to perform the actual query.
145 Public properties:
146 name: the pseudo-kind name
148 name = '__property__'
150 def Query(self, query, filters, orders):
151 """Perform a query on this pseudo-kind.
153 Args:
154 query: the original datastore_pb.Query.
155 filters: the filters from query.
156 orders: the orders from query.
158 Returns:
159 (results, remaining_filters, remaining_orders)
160 results is a list of entity_pb.EntityProto
161 remaining_filters and remaining_orders are the filters and orders that
162 should be applied in memory
164 property_range = datastore_stub_util.ParsePropertyQuery(query, filters,
165 orders)
166 keys_only = query.keys_only()
167 app_namespace_str = datastore_types.EncodeAppIdNamespace(
168 query.app(), query.name_space())
170 properties = []
171 if keys_only:
172 usekey = '__property__keys'
173 else:
174 usekey = '__property__'
176 entities = self._stub._GetAllEntities()
177 for app_namespace, kind in entities:
178 if app_namespace != app_namespace_str: continue
180 app_kind = (app_namespace_str, kind)
181 kind = kind.decode('utf-8')
185 (start_cmp, end_cmp) = property_range.MapExtremes(
186 lambda extreme, inclusive, is_end: cmp(kind, extreme[0]))
187 if not((start_cmp is None or start_cmp >= 0) and
188 (end_cmp is None or end_cmp <= 0)):
189 continue
192 kind_properties = self._stub._GetSchemaCache(app_kind, usekey)
193 if not kind_properties:
194 kind_properties = []
195 kind_key = datastore_types.Key.from_path(KindPseudoKind.name, kind,
196 _app=query.app(),
197 namespace=query.name_space())
199 props = collections.defaultdict(set)
203 for entity in entities[app_kind].values():
204 for prop in entity.protobuf.property_list():
205 prop_name = prop.name()
207 if (prop_name in
208 datastore_stub_util.GetInvisibleSpecialPropertyNames()):
209 continue
210 value_pb = prop.value()
211 props[prop_name].add(datastore_types.GetPropertyValueTag(value_pb))
214 for prop in sorted(props):
215 property_e = datastore.Entity(self.name, name=prop, parent=kind_key,
216 _app=query.app(),
217 namespace=query.name_space())
219 if not keys_only and props[prop]:
220 property_e['property_representation'] = [
221 datastore_stub_util._PROPERTY_TYPE_NAMES[tag]
222 for tag in sorted(props[prop])]
224 kind_properties.append(property_e._ToPb())
226 self._stub._SetSchemaCache(app_kind, usekey, kind_properties)
229 def InQuery(property_e):
230 return property_range.Contains(
231 (kind, _FinalElement(property_e.key()).name()))
232 properties += filter(InQuery, kind_properties)
234 return (properties, [], [])
237 class NamespacePseudoKind(object):
238 """Pseudo-kind for namespace queries.
240 Provides a Query method to perform the actual query.
242 Public properties:
243 name: the pseudo-kind name
245 name = '__namespace__'
247 def Query(self, query, filters, orders):
248 """Perform a query on this pseudo-kind.
250 Args:
251 query: the original datastore_pb.Query.
252 filters: the filters from query.
253 orders: the orders from query.
255 Returns:
256 (results, remaining_filters, remaining_orders)
257 results is a list of entity_pb.EntityProto
258 remaining_filters and remaining_orders are the filters and orders that
259 should be applied in memory
261 namespace_range = datastore_stub_util.ParseNamespaceQuery(query, filters,
262 orders)
263 app_str = query.app()
265 namespaces = set()
267 for app_namespace, _ in self._stub._GetAllEntities():
268 (app_id, namespace) = datastore_types.DecodeAppIdNamespace(app_namespace)
269 if app_id == app_str and namespace_range.Contains(namespace):
270 namespaces.add(namespace)
273 namespace_entities = []
274 for namespace in namespaces:
275 if namespace:
276 namespace_e = datastore.Entity(self.name, name=namespace,
277 _app=query.app())
278 else:
279 namespace_e = datastore.Entity(self.name,
280 id=datastore_types._EMPTY_NAMESPACE_ID,
281 _app=query.app())
282 namespace_entities.append(namespace_e._ToPb())
284 return (namespace_entities, [], [])
287 class DatastoreFileStub(datastore_stub_util.BaseDatastore,
288 apiproxy_stub.APIProxyStub,
289 datastore_stub_util.DatastoreStub):
290 """ Persistent stub for the Python datastore API.
292 Stores all entities in memory, and persists them to a file as pickled
293 protocol buffers. A DatastoreFileStub instance handles a single app's data
294 and is backed by files on disk.
297 def __init__(self,
298 app_id,
299 datastore_file,
300 history_file=None,
301 require_indexes=False,
302 service_name='datastore_v3',
303 trusted=False,
304 consistency_policy=None,
305 save_changes=True,
306 root_path=None,
307 use_atexit=True,
308 auto_id_policy=datastore_stub_util.SEQUENTIAL):
309 """Constructor.
311 Initializes and loads the datastore from the backing files, if they exist.
313 Args:
314 app_id: string
315 datastore_file: string, stores all entities across sessions. Use None
316 not to use a file.
317 history_file: DEPRECATED. No-op.
318 require_indexes: bool, default False. If True, composite indexes must
319 exist in index.yaml for queries that need them.
320 service_name: Service name expected for all calls.
321 trusted: bool, default False. If True, this stub allows an app to
322 access the data of another app.
323 consistency_policy: The consistency policy to use or None to use the
324 default. Consistency policies can be found in
325 datastore_stub_util.*ConsistencyPolicy
326 save_changes: bool, default True. If this stub should modify
327 datastore_file when entities are changed.
328 root_path: string, the root path of the app.
329 use_atexit: bool, indicates if the stub should save itself atexit.
330 auto_id_policy: enum, datastore_stub_util.SEQUENTIAL or .SCATTERED
337 self.__datastore_file = datastore_file
338 self.__save_changes = save_changes
347 self.__entities_by_kind = collections.defaultdict(dict)
348 self.__entities_by_group = collections.defaultdict(dict)
349 self.__entities_lock = threading.Lock()
354 self.__schema_cache = {}
356 self.__id_counters = {datastore_stub_util.SEQUENTIAL: 1L,
357 datastore_stub_util.SCATTERED: 1L
359 self.__id_lock = threading.Lock()
361 self.__file_lock = threading.Lock()
363 datastore_stub_util.BaseDatastore.__init__(
364 self, require_indexes, consistency_policy,
365 use_atexit and self.__IsSaveable(), auto_id_policy)
366 apiproxy_stub.APIProxyStub.__init__(self, service_name)
367 datastore_stub_util.DatastoreStub.__init__(self, weakref.proxy(self),
368 app_id, trusted, root_path)
371 self._RegisterPseudoKind(KindPseudoKind())
372 self._RegisterPseudoKind(PropertyPseudoKind())
373 self._RegisterPseudoKind(NamespacePseudoKind())
374 self._RegisterPseudoKind(datastore_stub_util.EntityGroupPseudoKind())
376 self.Read()
378 def Clear(self):
379 """ Clears the datastore by deleting all currently stored entities and
380 queries. """
381 self.__entities_lock.acquire()
382 try:
383 datastore_stub_util.BaseDatastore.Clear(self)
384 datastore_stub_util.DatastoreStub.Clear(self)
386 self.__entities_by_kind = collections.defaultdict(dict)
387 self.__entities_by_group = collections.defaultdict(dict)
388 self.__schema_cache = {}
389 finally:
390 self.__entities_lock.release()
392 def _GetAllEntities(self):
393 """Get all entities.
395 Returns:
396 Map from kind to _StoredEntity() list. Do not modify directly.
398 return self.__entities_by_kind
400 def _GetEntityLocation(self, key):
401 """Get keys to self.__entities_by_* from the given key.
403 Example usage:
404 app_kind, eg_k, k = self._GetEntityLocation(key)
405 self.__entities_by_kind[app_kind][k]
406 self.__entities_by_entity_group[eg_k][k]
408 Args:
409 key: entity_pb.Reference
411 Returns:
412 Tuple (by_kind key, by_entity_group key, entity key)
414 app_ns = datastore_types.EncodeAppIdNamespace(key.app(), key.name_space())
415 kind = _FinalElement(key).type()
416 entity_group = datastore_stub_util._GetEntityGroup(key)
417 eg_k = datastore_types.ReferenceToKeyValue(entity_group)
418 k = datastore_types.ReferenceToKeyValue(key)
420 return ((app_ns, kind), eg_k, k)
422 def _StoreEntity(self, entity, insert=False):
423 """ Store the given entity.
425 Any needed locking should be managed by the caller.
427 Args:
428 entity: The entity_pb.EntityProto to store.
429 insert: If we should check for existence.
431 app_kind, eg_k, k = self._GetEntityLocation(entity.key())
433 assert not insert or k not in self.__entities_by_kind[app_kind]
435 self.__entities_by_kind[app_kind][k] = _StoredEntity(entity)
436 self.__entities_by_group[eg_k][k] = entity
439 if app_kind in self.__schema_cache:
440 del self.__schema_cache[app_kind]
442 READ_PB_EXCEPTIONS = (ProtocolBuffer.ProtocolBufferDecodeError, LookupError,
443 TypeError, ValueError)
444 READ_ERROR_MSG = ('Data in %s is corrupt or a different version. '
445 'Try running with the --clear_datastore flag.\n%r')
446 READ_PY250_MSG = ('Are you using FloatProperty and/or GeoPtProperty? '
447 'Unfortunately loading float values from the datastore '
448 'file does not work with Python 2.5.0. '
449 'Please upgrade to a newer Python 2.5 release or use '
450 'the --clear_datastore flag.\n')
452 def Read(self):
453 """ Reads the datastore and history files into memory.
455 The in-memory query history is cleared, but the datastore is *not*
456 cleared; the entities in the files are merged into the entities in memory.
457 If you want them to overwrite the in-memory datastore, call Clear() before
458 calling Read().
460 If the datastore file contains an entity with the same app name, kind, and
461 key as an entity already in the datastore, the entity from the file
462 overwrites the entity in the datastore.
464 Also sets each ID counter to one greater than the highest ID allocated so
465 far in that counter's ID space.
467 if self.__datastore_file and self.__datastore_file != '/dev/null':
468 for encoded_entity in self.__ReadPickled(self.__datastore_file):
469 try:
470 entity = entity_pb.EntityProto(encoded_entity)
471 except self.READ_PB_EXCEPTIONS, e:
472 raise apiproxy_errors.ApplicationError(
473 datastore_pb.Error.INTERNAL_ERROR,
474 self.READ_ERROR_MSG % (self.__datastore_file, e))
475 except struct.error, e:
476 if (sys.version_info[0:3] == (2, 5, 0)
477 and e.message.startswith('unpack requires a string argument')):
480 raise apiproxy_errors.ApplicationError(
481 datastore_pb.Error.INTERNAL_ERROR,
482 self.READ_PY250_MSG + self.READ_ERROR_MSG %
483 (self.__datastore_file, e))
484 else:
485 raise
487 self._StoreEntity(entity)
489 last_path = _FinalElement(entity.key())
490 if last_path.id():
491 self._SetMaxId(last_path.id())
493 def Write(self):
494 """Writes out the datastore and history files.
496 Be careful! If the files already exist, this method overwrites them!
498 super(DatastoreFileStub, self).Write()
499 self.__WriteDatastore()
501 def __IsSaveable(self):
502 return (self.__datastore_file and self.__datastore_file != '/dev/null' and
503 self.__save_changes)
505 def __WriteDatastore(self):
506 """ Writes out the datastore file. Be careful! If the file already exists,
507 this method overwrites it!
509 if self.__IsSaveable():
510 encoded = []
511 for kind_dict in self.__entities_by_kind.values():
512 encoded.extend(entity.encoded_protobuf for entity in kind_dict.values())
514 self.__WritePickled(encoded, self.__datastore_file)
516 def __ReadPickled(self, filename):
517 """Reads a pickled object from the given file and returns it.
519 self.__file_lock.acquire()
521 try:
522 try:
523 if (filename and
524 filename != '/dev/null' and
525 os.path.isfile(filename) and
526 os.stat(filename).st_size > 0):
527 return pickle.load(open(filename, 'rb'))
528 else:
529 logging.warning('Could not read datastore data from %s', filename)
530 except (AttributeError, LookupError, ImportError, NameError, TypeError,
531 ValueError, struct.error, pickle.PickleError), e:
534 raise apiproxy_errors.ApplicationError(
535 datastore_pb.Error.INTERNAL_ERROR,
536 'Could not read data from %s. Try running with the '
537 '--clear_datastore flag. Cause:\n%r' % (filename, e))
538 finally:
539 self.__file_lock.release()
541 return []
543 def __WritePickled(self, obj, filename):
544 """Pickles the object and writes it to the given file.
546 if not filename or filename == '/dev/null' or not obj:
547 return
550 descriptor, tmp_filename = tempfile.mkstemp(dir=os.path.dirname(filename))
551 tmpfile = os.fdopen(descriptor, 'wb')
557 pickler = pickle.Pickler(tmpfile, protocol=1)
558 pickler.fast = True
559 pickler.dump(obj)
561 tmpfile.close()
563 self.__file_lock.acquire()
564 try:
565 try:
567 os.rename(tmp_filename, filename)
568 except OSError:
570 try:
571 os.remove(filename)
572 except:
573 pass
574 os.rename(tmp_filename, filename)
575 finally:
576 self.__file_lock.release()
578 def MakeSyncCall(self, service, call, request, response, request_id=None):
579 """ The main RPC entry point. service must be 'datastore_v3'."""
580 self.assertPbIsInitialized(request)
581 super(DatastoreFileStub, self).MakeSyncCall(service,
582 call,
583 request,
584 response,
585 request_id)
586 self.assertPbIsInitialized(response)
588 def assertPbIsInitialized(self, pb):
589 """Raises an exception if the given PB is not initialized and valid."""
590 explanation = []
591 assert pb.IsInitialized(explanation), explanation
593 pb.Encode()
595 def _GetSchemaCache(self, kind, usekey):
596 if kind in self.__schema_cache and usekey in self.__schema_cache[kind]:
597 return self.__schema_cache[kind][usekey]
598 else:
599 return None
601 def _SetSchemaCache(self, kind, usekey, value):
602 if kind not in self.__schema_cache:
603 self.__schema_cache[kind] = {}
604 self.__schema_cache[kind][usekey] = value
608 def _Put(self, entity, insert):
609 entity = datastore_stub_util.StoreEntity(entity)
611 self.__entities_lock.acquire()
612 try:
613 self._StoreEntity(entity, insert)
614 finally:
615 self.__entities_lock.release()
617 def _Get(self, key):
618 app_kind, _, k = self._GetEntityLocation(key)
621 try:
622 return datastore_stub_util.LoadEntity(
623 self.__entities_by_kind[app_kind][k].protobuf)
624 except KeyError:
625 pass
627 def _Delete(self, key):
628 app_kind, eg_k, k = self._GetEntityLocation(key)
630 self.__entities_lock.acquire()
631 try:
632 del self.__entities_by_kind[app_kind][k]
633 del self.__entities_by_group[eg_k][k]
634 if not self.__entities_by_kind[app_kind]:
636 del self.__entities_by_kind[app_kind]
637 if not self.__entities_by_group[eg_k]:
638 del self.__entities_by_group[eg_k]
640 del self.__schema_cache[app_kind]
641 except KeyError:
643 pass
644 finally:
645 self.__entities_lock.release()
647 def _GetEntitiesInEntityGroup(self, entity_group):
648 eg_k = datastore_types.ReferenceToKeyValue(entity_group)
649 return self.__entities_by_group[eg_k].copy()
651 def _GetQueryCursor(self, query, filters, orders, index_list,
652 filter_predicate=None):
653 app_id = query.app()
654 namespace = query.name_space()
656 pseudo_kind = None
657 if query.has_kind() and query.kind() in self._pseudo_kinds:
658 pseudo_kind = self._pseudo_kinds[query.kind()]
663 self.__entities_lock.acquire()
664 try:
665 app_ns = datastore_types.EncodeAppIdNamespace(app_id, namespace)
666 if pseudo_kind:
668 (results, filters, orders) = pseudo_kind.Query(query, filters, orders)
669 elif query.has_kind():
670 results = [entity.protobuf for entity in
671 self.__entities_by_kind[app_ns, query.kind()].values()]
672 else:
673 results = []
674 for (cur_app_ns, _), entities in self.__entities_by_kind.iteritems():
675 if cur_app_ns == app_ns:
676 results.extend(entity.protobuf for entity in entities.itervalues())
677 except KeyError:
678 results = []
679 finally:
680 self.__entities_lock.release()
682 return datastore_stub_util._ExecuteQuery(results, query, filters, orders,
683 index_list, filter_predicate)
685 def _SetIdCounter(self, id_space, value):
686 """Set the ID counter for id_space to value."""
687 self.__id_counters[id_space] = value
689 def _IdCounter(self, id_space):
690 """Return current value of ID counter for id_space."""
691 return self.__id_counters[id_space]
693 def _SetMaxId(self, max_id):
694 """Infer ID space and advance corresponding counter."""
695 count, id_space = datastore_stub_util.IdToCounter(max_id)
696 if count >= self._IdCounter(id_space):
697 self._SetIdCounter(id_space, count + 1)
699 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
700 datastore_stub_util.Check(not (size and max_id),
701 'Both size and max cannot be set.')
703 self.__id_lock.acquire()
704 try:
705 id_space = datastore_stub_util.SEQUENTIAL
706 start = self._IdCounter(id_space)
707 if size:
708 datastore_stub_util.Check(size > 0, 'Size must be greater than 0.')
709 self._SetIdCounter(id_space, start + size)
710 elif max_id:
711 datastore_stub_util.Check(max_id >=0,
712 'Max must be greater than or equal to 0.')
713 self._SetIdCounter(id_space, max(start, max_id + 1))
714 end = self._IdCounter(id_space) - 1
715 finally:
716 self.__id_lock.release()
718 return (start, end)
720 def _AllocateIds(self, keys):
721 self.__id_lock.acquire()
722 full_keys = []
723 try:
724 for key in keys:
725 last_element = _FinalElement(key)
727 if last_element.id() or last_element.has_name():
728 for el in key.path().element_list():
729 if el.id():
730 self._SetMaxId(el.id())
732 else:
733 id_space = datastore_stub_util.SCATTERED
734 count = self._IdCounter(id_space)
735 last_element.set_id(datastore_stub_util.ToScatteredId(count))
736 self._SetIdCounter(id_space, count + 1)
737 full_keys.append(key)
738 finally:
739 self.__id_lock.release()
741 return full_keys