3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
23 In-memory persistent stub for the Python datastore API. Gets, queries,
24 and searches are implemented as in-memory scans over all entities.
26 Stores entities across sessions as pickled proto bufs in a single file. On
27 startup, all entities are read from the file and loaded into memory. On
28 every Put(), the file is wiped and all entities are written from scratch.
29 Clients can also manually Read() and Write() the file themselves.
55 import cPickle
as pickle
57 from google
.appengine
.api
import apiproxy_stub
58 from google
.appengine
.api
import datastore
59 from google
.appengine
.api
import datastore_types
60 from google
.appengine
.datastore
import datastore_pb
61 from google
.appengine
.datastore
import datastore_stub_util
62 from google
.appengine
.runtime
import apiproxy_errors
63 from google
.net
.proto
import ProtocolBuffer
64 from google
.appengine
.datastore
import entity_pb
68 datastore_pb
.Query
.__hash
__ = lambda self
: hash(self
.Encode())
71 def _FinalElement(key
):
72 """Return final element of a key's path."""
73 return key
.path().element_list()[-1]
76 class _StoredEntity(object):
77 """Simple wrapper around an entity stored by the stub.
80 protobuf: Native protobuf Python object, entity_pb.EntityProto.
81 encoded_protobuf: Encoded binary representation of above protobuf.
84 def __init__(self
, entity
):
85 """Create a _StoredEntity object and store an entity.
88 entity: entity_pb.EntityProto to store.
90 self
.protobuf
= entity
97 self
.encoded_protobuf
= entity
.Encode()
100 class KindPseudoKind(object):
101 """Pseudo-kind for schema queries.
103 Provides a Query method to perform the actual query.
106 name: the pseudo-kind name
110 def Query(self
, query
, filters
, orders
):
111 """Perform a query on this pseudo-kind.
114 query: the original datastore_pb.Query.
115 filters: the filters from query.
116 orders: the orders from query.
119 (results, remaining_filters, remaining_orders)
120 results is a list of entity_pb.EntityProto
121 remaining_filters and remaining_orders are the filters and orders that
122 should be applied in memory
124 kind_range
= datastore_stub_util
.ParseKindQuery(query
, filters
, orders
)
125 app_namespace_str
= datastore_types
.EncodeAppIdNamespace(
126 query
.app(), query
.name_space())
130 for app_namespace
, kind
in self
._stub
._GetAllEntities
():
131 if app_namespace
!= app_namespace_str
: continue
132 kind
= kind
.decode('utf-8')
133 if not kind_range
.Contains(kind
): continue
134 kinds
.append(datastore
.Entity(self
.name
, name
=kind
, _app
=query
.app(),
135 namespace
=query
.name_space())._ToPb
())
137 return (kinds
, [], [])
140 class PropertyPseudoKind(object):
141 """Pseudo-kind for schema queries.
143 Provides a Query method to perform the actual query.
146 name: the pseudo-kind name
148 name
= '__property__'
150 def Query(self
, query
, filters
, orders
):
151 """Perform a query on this pseudo-kind.
154 query: the original datastore_pb.Query.
155 filters: the filters from query.
156 orders: the orders from query.
159 (results, remaining_filters, remaining_orders)
160 results is a list of entity_pb.EntityProto
161 remaining_filters and remaining_orders are the filters and orders that
162 should be applied in memory
164 property_range
= datastore_stub_util
.ParsePropertyQuery(query
, filters
,
166 keys_only
= query
.keys_only()
167 app_namespace_str
= datastore_types
.EncodeAppIdNamespace(
168 query
.app(), query
.name_space())
172 usekey
= '__property__keys'
174 usekey
= '__property__'
176 entities
= self
._stub
._GetAllEntities
()
177 for app_namespace
, kind
in entities
:
178 if app_namespace
!= app_namespace_str
: continue
180 app_kind
= (app_namespace_str
, kind
)
181 kind
= kind
.decode('utf-8')
185 (start_cmp
, end_cmp
) = property_range
.MapExtremes(
186 lambda extreme
, inclusive
, is_end
: cmp(kind
, extreme
[0]))
187 if not((start_cmp
is None or start_cmp
>= 0) and
188 (end_cmp
is None or end_cmp
<= 0)):
192 kind_properties
= self
._stub
._GetSchemaCache
(app_kind
, usekey
)
193 if not kind_properties
:
195 kind_key
= datastore_types
.Key
.from_path(KindPseudoKind
.name
, kind
,
197 namespace
=query
.name_space())
199 props
= collections
.defaultdict(set)
203 for entity
in entities
[app_kind
].values():
204 for prop
in entity
.protobuf
.property_list():
205 prop_name
= prop
.name()
208 datastore_stub_util
.GetInvisibleSpecialPropertyNames()):
210 value_pb
= prop
.value()
211 props
[prop_name
].add(datastore_types
.GetPropertyValueTag(value_pb
))
214 for prop
in sorted(props
):
215 property_e
= datastore
.Entity(self
.name
, name
=prop
, parent
=kind_key
,
217 namespace
=query
.name_space())
219 if not keys_only
and props
[prop
]:
220 property_e
['property_representation'] = [
221 datastore_stub_util
._PROPERTY
_TYPE
_NAMES
[tag
]
222 for tag
in sorted(props
[prop
])]
224 kind_properties
.append(property_e
._ToPb
())
226 self
._stub
._SetSchemaCache
(app_kind
, usekey
, kind_properties
)
229 def InQuery(property_e
):
230 return property_range
.Contains(
231 (kind
, _FinalElement(property_e
.key()).name()))
232 properties
+= filter(InQuery
, kind_properties
)
234 return (properties
, [], [])
237 class NamespacePseudoKind(object):
238 """Pseudo-kind for namespace queries.
240 Provides a Query method to perform the actual query.
243 name: the pseudo-kind name
245 name
= '__namespace__'
247 def Query(self
, query
, filters
, orders
):
248 """Perform a query on this pseudo-kind.
251 query: the original datastore_pb.Query.
252 filters: the filters from query.
253 orders: the orders from query.
256 (results, remaining_filters, remaining_orders)
257 results is a list of entity_pb.EntityProto
258 remaining_filters and remaining_orders are the filters and orders that
259 should be applied in memory
261 namespace_range
= datastore_stub_util
.ParseNamespaceQuery(query
, filters
,
263 app_str
= query
.app()
267 for app_namespace
, _
in self
._stub
._GetAllEntities
():
268 (app_id
, namespace
) = datastore_types
.DecodeAppIdNamespace(app_namespace
)
269 if app_id
== app_str
and namespace_range
.Contains(namespace
):
270 namespaces
.add(namespace
)
273 namespace_entities
= []
274 for namespace
in namespaces
:
276 namespace_e
= datastore
.Entity(self
.name
, name
=namespace
,
279 namespace_e
= datastore
.Entity(self
.name
,
280 id=datastore_types
._EMPTY
_NAMESPACE
_ID
,
282 namespace_entities
.append(namespace_e
._ToPb
())
284 return (namespace_entities
, [], [])
287 class DatastoreFileStub(datastore_stub_util
.BaseDatastore
,
288 apiproxy_stub
.APIProxyStub
,
289 datastore_stub_util
.DatastoreStub
):
290 """ Persistent stub for the Python datastore API.
292 Stores all entities in memory, and persists them to a file as pickled
293 protocol buffers. A DatastoreFileStub instance handles a single app's data
294 and is backed by files on disk.
301 require_indexes
=False,
302 service_name
='datastore_v3',
304 consistency_policy
=None,
308 auto_id_policy
=datastore_stub_util
.SEQUENTIAL
):
311 Initializes and loads the datastore from the backing files, if they exist.
315 datastore_file: string, stores all entities across sessions. Use None
317 history_file: DEPRECATED. No-op.
318 require_indexes: bool, default False. If True, composite indexes must
319 exist in index.yaml for queries that need them.
320 service_name: Service name expected for all calls.
321 trusted: bool, default False. If True, this stub allows an app to
322 access the data of another app.
323 consistency_policy: The consistency policy to use or None to use the
324 default. Consistency policies can be found in
325 datastore_stub_util.*ConsistencyPolicy
326 save_changes: bool, default True. If this stub should modify
327 datastore_file when entities are changed.
328 root_path: string, the root path of the app.
329 use_atexit: bool, indicates if the stub should save itself atexit.
330 auto_id_policy: enum, datastore_stub_util.SEQUENTIAL or .SCATTERED
337 self
.__datastore
_file
= datastore_file
338 self
.__save
_changes
= save_changes
347 self
.__entities
_by
_kind
= collections
.defaultdict(dict)
348 self
.__entities
_by
_group
= collections
.defaultdict(dict)
349 self
.__entities
_lock
= threading
.Lock()
354 self
.__schema
_cache
= {}
356 self
.__id
_counters
= {datastore_stub_util
.SEQUENTIAL
: 1L,
357 datastore_stub_util
.SCATTERED
: 1L
359 self
.__id
_lock
= threading
.Lock()
361 self
.__file
_lock
= threading
.Lock()
363 datastore_stub_util
.BaseDatastore
.__init
__(
364 self
, require_indexes
, consistency_policy
,
365 use_atexit
and self
.__IsSaveable
(), auto_id_policy
)
366 apiproxy_stub
.APIProxyStub
.__init
__(self
, service_name
)
367 datastore_stub_util
.DatastoreStub
.__init
__(self
, weakref
.proxy(self
),
368 app_id
, trusted
, root_path
)
371 self
._RegisterPseudoKind
(KindPseudoKind())
372 self
._RegisterPseudoKind
(PropertyPseudoKind())
373 self
._RegisterPseudoKind
(NamespacePseudoKind())
374 self
._RegisterPseudoKind
(datastore_stub_util
.EntityGroupPseudoKind())
379 """ Clears the datastore by deleting all currently stored entities and
381 self
.__entities
_lock
.acquire()
383 datastore_stub_util
.BaseDatastore
.Clear(self
)
384 datastore_stub_util
.DatastoreStub
.Clear(self
)
386 self
.__entities
_by
_kind
= collections
.defaultdict(dict)
387 self
.__entities
_by
_group
= collections
.defaultdict(dict)
388 self
.__schema
_cache
= {}
390 self
.__entities
_lock
.release()
392 def _GetAllEntities(self
):
396 Map from kind to _StoredEntity() list. Do not modify directly.
398 return self
.__entities
_by
_kind
400 def _GetEntityLocation(self
, key
):
401 """Get keys to self.__entities_by_* from the given key.
404 app_kind, eg_k, k = self._GetEntityLocation(key)
405 self.__entities_by_kind[app_kind][k]
406 self.__entities_by_entity_group[eg_k][k]
409 key: entity_pb.Reference
412 Tuple (by_kind key, by_entity_group key, entity key)
414 app_ns
= datastore_types
.EncodeAppIdNamespace(key
.app(), key
.name_space())
415 kind
= _FinalElement(key
).type()
416 entity_group
= datastore_stub_util
._GetEntityGroup
(key
)
417 eg_k
= datastore_types
.ReferenceToKeyValue(entity_group
)
418 k
= datastore_types
.ReferenceToKeyValue(key
)
420 return ((app_ns
, kind
), eg_k
, k
)
422 def _StoreEntity(self
, entity
, insert
=False):
423 """ Store the given entity.
425 Any needed locking should be managed by the caller.
428 entity: The entity_pb.EntityProto to store.
429 insert: If we should check for existence.
431 app_kind
, eg_k
, k
= self
._GetEntityLocation
(entity
.key())
433 assert not insert
or k
not in self
.__entities
_by
_kind
[app_kind
]
435 self
.__entities
_by
_kind
[app_kind
][k
] = _StoredEntity(entity
)
436 self
.__entities
_by
_group
[eg_k
][k
] = entity
439 if app_kind
in self
.__schema
_cache
:
440 del self
.__schema
_cache
[app_kind
]
442 READ_PB_EXCEPTIONS
= (ProtocolBuffer
.ProtocolBufferDecodeError
, LookupError,
443 TypeError, ValueError)
444 READ_ERROR_MSG
= ('Data in %s is corrupt or a different version. '
445 'Try running with the --clear_datastore flag.\n%r')
446 READ_PY250_MSG
= ('Are you using FloatProperty and/or GeoPtProperty? '
447 'Unfortunately loading float values from the datastore '
448 'file does not work with Python 2.5.0. '
449 'Please upgrade to a newer Python 2.5 release or use '
450 'the --clear_datastore flag.\n')
453 """ Reads the datastore and history files into memory.
455 The in-memory query history is cleared, but the datastore is *not*
456 cleared; the entities in the files are merged into the entities in memory.
457 If you want them to overwrite the in-memory datastore, call Clear() before
460 If the datastore file contains an entity with the same app name, kind, and
461 key as an entity already in the datastore, the entity from the file
462 overwrites the entity in the datastore.
464 Also sets each ID counter to one greater than the highest ID allocated so
465 far in that counter's ID space.
467 if self
.__datastore
_file
and self
.__datastore
_file
!= '/dev/null':
468 for encoded_entity
in self
.__ReadPickled
(self
.__datastore
_file
):
470 entity
= entity_pb
.EntityProto(encoded_entity
)
471 except self
.READ_PB_EXCEPTIONS
, e
:
472 raise apiproxy_errors
.ApplicationError(
473 datastore_pb
.Error
.INTERNAL_ERROR
,
474 self
.READ_ERROR_MSG
% (self
.__datastore
_file
, e
))
475 except struct
.error
, e
:
476 if (sys
.version_info
[0:3] == (2, 5, 0)
477 and e
.message
.startswith('unpack requires a string argument')):
480 raise apiproxy_errors
.ApplicationError(
481 datastore_pb
.Error
.INTERNAL_ERROR
,
482 self
.READ_PY250_MSG
+ self
.READ_ERROR_MSG
%
483 (self
.__datastore
_file
, e
))
487 self
._StoreEntity
(entity
)
489 last_path
= _FinalElement(entity
.key())
491 self
._SetMaxId
(last_path
.id())
494 """Writes out the datastore and history files.
496 Be careful! If the files already exist, this method overwrites them!
498 super(DatastoreFileStub
, self
).Write()
499 self
.__WriteDatastore
()
501 def __IsSaveable(self
):
502 return (self
.__datastore
_file
and self
.__datastore
_file
!= '/dev/null' and
505 def __WriteDatastore(self
):
506 """ Writes out the datastore file. Be careful! If the file already exists,
507 this method overwrites it!
509 if self
.__IsSaveable
():
511 for kind_dict
in self
.__entities
_by
_kind
.values():
512 encoded
.extend(entity
.encoded_protobuf
for entity
in kind_dict
.values())
514 self
.__WritePickled
(encoded
, self
.__datastore
_file
)
516 def __ReadPickled(self
, filename
):
517 """Reads a pickled object from the given file and returns it.
519 self
.__file
_lock
.acquire()
524 filename
!= '/dev/null' and
525 os
.path
.isfile(filename
) and
526 os
.stat(filename
).st_size
> 0):
527 return pickle
.load(open(filename
, 'rb'))
529 logging
.warning('Could not read datastore data from %s', filename
)
530 except (AttributeError, LookupError, ImportError, NameError, TypeError,
531 ValueError, struct
.error
, pickle
.PickleError
), e
:
534 raise apiproxy_errors
.ApplicationError(
535 datastore_pb
.Error
.INTERNAL_ERROR
,
536 'Could not read data from %s. Try running with the '
537 '--clear_datastore flag. Cause:\n%r' % (filename
, e
))
539 self
.__file
_lock
.release()
543 def __WritePickled(self
, obj
, filename
):
544 """Pickles the object and writes it to the given file.
546 if not filename
or filename
== '/dev/null' or not obj
:
550 descriptor
, tmp_filename
= tempfile
.mkstemp(dir=os
.path
.dirname(filename
))
551 tmpfile
= os
.fdopen(descriptor
, 'wb')
557 pickler
= pickle
.Pickler(tmpfile
, protocol
=1)
563 self
.__file
_lock
.acquire()
567 os
.rename(tmp_filename
, filename
)
574 os
.rename(tmp_filename
, filename
)
576 self
.__file
_lock
.release()
578 def MakeSyncCall(self
, service
, call
, request
, response
, request_id
=None):
579 """ The main RPC entry point. service must be 'datastore_v3'."""
580 self
.assertPbIsInitialized(request
)
581 super(DatastoreFileStub
, self
).MakeSyncCall(service
,
586 self
.assertPbIsInitialized(response
)
588 def assertPbIsInitialized(self
, pb
):
589 """Raises an exception if the given PB is not initialized and valid."""
591 assert pb
.IsInitialized(explanation
), explanation
595 def _GetSchemaCache(self
, kind
, usekey
):
596 if kind
in self
.__schema
_cache
and usekey
in self
.__schema
_cache
[kind
]:
597 return self
.__schema
_cache
[kind
][usekey
]
601 def _SetSchemaCache(self
, kind
, usekey
, value
):
602 if kind
not in self
.__schema
_cache
:
603 self
.__schema
_cache
[kind
] = {}
604 self
.__schema
_cache
[kind
][usekey
] = value
608 def _Put(self
, entity
, insert
):
609 entity
= datastore_stub_util
.StoreEntity(entity
)
611 self
.__entities
_lock
.acquire()
613 self
._StoreEntity
(entity
, insert
)
615 self
.__entities
_lock
.release()
618 app_kind
, _
, k
= self
._GetEntityLocation
(key
)
622 return datastore_stub_util
.LoadEntity(
623 self
.__entities
_by
_kind
[app_kind
][k
].protobuf
)
627 def _Delete(self
, key
):
628 app_kind
, eg_k
, k
= self
._GetEntityLocation
(key
)
630 self
.__entities
_lock
.acquire()
632 del self
.__entities
_by
_kind
[app_kind
][k
]
633 del self
.__entities
_by
_group
[eg_k
][k
]
634 if not self
.__entities
_by
_kind
[app_kind
]:
636 del self
.__entities
_by
_kind
[app_kind
]
637 if not self
.__entities
_by
_group
[eg_k
]:
638 del self
.__entities
_by
_group
[eg_k
]
640 del self
.__schema
_cache
[app_kind
]
645 self
.__entities
_lock
.release()
647 def _GetEntitiesInEntityGroup(self
, entity_group
):
648 eg_k
= datastore_types
.ReferenceToKeyValue(entity_group
)
649 return self
.__entities
_by
_group
[eg_k
].copy()
651 def _GetQueryCursor(self
, query
, filters
, orders
, index_list
,
652 filter_predicate
=None):
654 namespace
= query
.name_space()
657 if query
.has_kind() and query
.kind() in self
._pseudo
_kinds
:
658 pseudo_kind
= self
._pseudo
_kinds
[query
.kind()]
663 self
.__entities
_lock
.acquire()
665 app_ns
= datastore_types
.EncodeAppIdNamespace(app_id
, namespace
)
668 (results
, filters
, orders
) = pseudo_kind
.Query(query
, filters
, orders
)
669 elif query
.has_kind():
670 results
= [entity
.protobuf
for entity
in
671 self
.__entities
_by
_kind
[app_ns
, query
.kind()].values()]
674 for (cur_app_ns
, _
), entities
in self
.__entities
_by
_kind
.iteritems():
675 if cur_app_ns
== app_ns
:
676 results
.extend(entity
.protobuf
for entity
in entities
.itervalues())
680 self
.__entities
_lock
.release()
682 return datastore_stub_util
._ExecuteQuery
(results
, query
, filters
, orders
,
683 index_list
, filter_predicate
)
685 def _SetIdCounter(self
, id_space
, value
):
686 """Set the ID counter for id_space to value."""
687 self
.__id
_counters
[id_space
] = value
689 def _IdCounter(self
, id_space
):
690 """Return current value of ID counter for id_space."""
691 return self
.__id
_counters
[id_space
]
693 def _SetMaxId(self
, max_id
):
694 """Infer ID space and advance corresponding counter."""
695 count
, id_space
= datastore_stub_util
.IdToCounter(max_id
)
696 if count
>= self
._IdCounter
(id_space
):
697 self
._SetIdCounter
(id_space
, count
+ 1)
699 def _AllocateSequentialIds(self
, reference
, size
=1, max_id
=None):
700 datastore_stub_util
.Check(not (size
and max_id
),
701 'Both size and max cannot be set.')
703 self
.__id
_lock
.acquire()
705 id_space
= datastore_stub_util
.SEQUENTIAL
706 start
= self
._IdCounter
(id_space
)
708 datastore_stub_util
.Check(size
> 0, 'Size must be greater than 0.')
709 self
._SetIdCounter
(id_space
, start
+ size
)
711 datastore_stub_util
.Check(max_id
>=0,
712 'Max must be greater than or equal to 0.')
713 self
._SetIdCounter
(id_space
, max(start
, max_id
+ 1))
714 end
= self
._IdCounter
(id_space
) - 1
716 self
.__id
_lock
.release()
720 def _AllocateIds(self
, keys
):
721 self
.__id
_lock
.acquire()
725 last_element
= _FinalElement(key
)
727 if last_element
.id() or last_element
.has_name():
728 for el
in key
.path().element_list():
730 self
._SetMaxId
(el
.id())
733 id_space
= datastore_stub_util
.SCATTERED
734 count
= self
._IdCounter
(id_space
)
735 last_element
.set_id(datastore_stub_util
.ToScatteredId(count
))
736 self
._SetIdCounter
(id_space
, count
+ 1)
737 full_keys
.append(key
)
739 self
.__id
_lock
.release()