3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
23 In-memory persistent stub for the Python datastore API. Gets, queries,
24 and searches are implemented as in-memory scans over all entities.
26 Stores entities across sessions as pickled proto bufs in a single file. On
27 startup, all entities are read from the file and loaded into memory. On
28 every Put(), the file is wiped and all entities are written from scratch.
29 Clients can also manually Read() and Write() the file themselves.
54 import cPickle
as pickle
56 from google
.appengine
.api
import apiproxy_stub
57 from google
.appengine
.api
import datastore
58 from google
.appengine
.api
import datastore_types
59 from google
.appengine
.datastore
import datastore_pb
60 from google
.appengine
.datastore
import datastore_stub_util
61 from google
.appengine
.runtime
import apiproxy_errors
62 from google
.net
.proto
import ProtocolBuffer
63 from google
.appengine
.datastore
import entity_pb
67 datastore_pb
.Query
.__hash
__ = lambda self
: hash(self
.Encode())
70 def _FinalElement(key
):
71 """Return final element of a key's path."""
72 return key
.path().element_list()[-1]
75 class _StoredEntity(object):
76 """Simple wrapper around an entity stored by the stub.
79 protobuf: Native protobuf Python object, entity_pb.EntityProto.
80 encoded_protobuf: Encoded binary representation of above protobuf.
83 def __init__(self
, entity
):
84 """Create a _StoredEntity object and store an entity.
87 entity: entity_pb.EntityProto to store.
89 self
.protobuf
= entity
96 self
.encoded_protobuf
= entity
.Encode()
99 class KindPseudoKind(object):
100 """Pseudo-kind for schema queries.
102 Provides a Query method to perform the actual query.
105 name: the pseudo-kind name
109 def Query(self
, query
, filters
, orders
):
110 """Perform a query on this pseudo-kind.
113 query: the original datastore_pb.Query.
114 filters: the filters from query.
115 orders: the orders from query.
118 (results, remaining_filters, remaining_orders)
119 results is a list of entity_pb.EntityProto
120 remaining_filters and remaining_orders are the filters and orders that
121 should be applied in memory
123 kind_range
= datastore_stub_util
.ParseKindQuery(query
, filters
, orders
)
124 app_namespace_str
= datastore_types
.EncodeAppIdNamespace(
125 query
.app(), query
.name_space())
129 for app_namespace
, kind
in self
._stub
._GetAllEntities
():
130 if app_namespace
!= app_namespace_str
: continue
131 kind
= kind
.decode('utf-8')
132 if not kind_range
.Contains(kind
): continue
133 kinds
.append(datastore
.Entity(self
.name
, name
=kind
, _app
=query
.app(),
134 namespace
=query
.name_space())._ToPb
())
136 return (kinds
, [], [])
139 class PropertyPseudoKind(object):
140 """Pseudo-kind for schema queries.
142 Provides a Query method to perform the actual query.
145 name: the pseudo-kind name
147 name
= '__property__'
149 def Query(self
, query
, filters
, orders
):
150 """Perform a query on this pseudo-kind.
153 query: the original datastore_pb.Query.
154 filters: the filters from query.
155 orders: the orders from query.
158 (results, remaining_filters, remaining_orders)
159 results is a list of entity_pb.EntityProto
160 remaining_filters and remaining_orders are the filters and orders that
161 should be applied in memory
163 property_range
= datastore_stub_util
.ParsePropertyQuery(query
, filters
,
165 keys_only
= query
.keys_only()
166 app_namespace_str
= datastore_types
.EncodeAppIdNamespace(
167 query
.app(), query
.name_space())
171 usekey
= '__property__keys'
173 usekey
= '__property__'
175 entities
= self
._stub
._GetAllEntities
()
176 for app_namespace
, kind
in entities
:
177 if app_namespace
!= app_namespace_str
: continue
179 app_kind
= (app_namespace_str
, kind
)
180 kind
= kind
.decode('utf-8')
184 (start_cmp
, end_cmp
) = property_range
.MapExtremes(
185 lambda extreme
, inclusive
, is_end
: cmp(kind
, extreme
[0]))
186 if not((start_cmp
is None or start_cmp
>= 0) and
187 (end_cmp
is None or end_cmp
<= 0)):
191 kind_properties
= self
._stub
._GetSchemaCache
(app_kind
, usekey
)
192 if not kind_properties
:
194 kind_key
= datastore_types
.Key
.from_path(KindPseudoKind
.name
, kind
,
196 namespace
=query
.name_space())
198 props
= collections
.defaultdict(set)
202 for entity
in entities
[app_kind
].values():
203 for prop
in entity
.protobuf
.property_list():
204 prop_name
= prop
.name()
207 datastore_stub_util
.GetInvisibleSpecialPropertyNames()):
209 value_pb
= prop
.value()
210 props
[prop_name
].add(datastore_types
.GetPropertyValueTag(value_pb
))
213 for prop
in sorted(props
):
214 property_e
= datastore
.Entity(self
.name
, name
=prop
, parent
=kind_key
,
216 namespace
=query
.name_space())
218 if not keys_only
and props
[prop
]:
219 property_e
['property_representation'] = [
220 datastore_stub_util
._PROPERTY
_TYPE
_NAMES
[tag
]
221 for tag
in sorted(props
[prop
])]
223 kind_properties
.append(property_e
._ToPb
())
225 self
._stub
._SetSchemaCache
(app_kind
, usekey
, kind_properties
)
228 def InQuery(property_e
):
229 return property_range
.Contains(
230 (kind
, _FinalElement(property_e
.key()).name()))
231 properties
+= filter(InQuery
, kind_properties
)
233 return (properties
, [], [])
236 class NamespacePseudoKind(object):
237 """Pseudo-kind for namespace queries.
239 Provides a Query method to perform the actual query.
242 name: the pseudo-kind name
244 name
= '__namespace__'
246 def Query(self
, query
, filters
, orders
):
247 """Perform a query on this pseudo-kind.
250 query: the original datastore_pb.Query.
251 filters: the filters from query.
252 orders: the orders from query.
255 (results, remaining_filters, remaining_orders)
256 results is a list of entity_pb.EntityProto
257 remaining_filters and remaining_orders are the filters and orders that
258 should be applied in memory
260 namespace_range
= datastore_stub_util
.ParseNamespaceQuery(query
, filters
,
262 app_str
= query
.app()
266 for app_namespace
, _
in self
._stub
._GetAllEntities
():
267 (app_id
, namespace
) = datastore_types
.DecodeAppIdNamespace(app_namespace
)
268 if app_id
== app_str
and namespace_range
.Contains(namespace
):
269 namespaces
.add(namespace
)
272 namespace_entities
= []
273 for namespace
in namespaces
:
275 namespace_e
= datastore
.Entity(self
.name
, name
=namespace
,
278 namespace_e
= datastore
.Entity(self
.name
,
279 id=datastore_types
._EMPTY
_NAMESPACE
_ID
,
281 namespace_entities
.append(namespace_e
._ToPb
())
283 return (namespace_entities
, [], [])
286 class DatastoreFileStub(datastore_stub_util
.BaseDatastore
,
287 apiproxy_stub
.APIProxyStub
,
288 datastore_stub_util
.DatastoreStub
):
289 """ Persistent stub for the Python datastore API.
291 Stores all entities in memory, and persists them to a file as pickled
292 protocol buffers. A DatastoreFileStub instance handles a single app's data
293 and is backed by files on disk.
300 require_indexes
=False,
301 service_name
='datastore_v3',
303 consistency_policy
=None,
307 auto_id_policy
=datastore_stub_util
.SEQUENTIAL
):
310 Initializes and loads the datastore from the backing files, if they exist.
314 datastore_file: string, stores all entities across sessions. Use None
316 history_file: DEPRECATED. No-op.
317 require_indexes: bool, default False. If True, composite indexes must
318 exist in index.yaml for queries that need them.
319 service_name: Service name expected for all calls.
320 trusted: bool, default False. If True, this stub allows an app to
321 access the data of another app.
322 consistency_policy: The consistency policy to use or None to use the
323 default. Consistency policies can be found in
324 datastore_stub_util.*ConsistencyPolicy
325 save_changes: bool, default True. If this stub should modify
326 datastore_file when entities are changed.
327 root_path: string, the root path of the app.
328 use_atexit: bool, indicates if the stub should save itself atexit.
329 auto_id_policy: enum, datastore_stub_util.SEQUENTIAL or .SCATTERED
336 self
.__datastore
_file
= datastore_file
337 self
.__save
_changes
= save_changes
346 self
.__entities
_by
_kind
= collections
.defaultdict(dict)
347 self
.__entities
_by
_group
= collections
.defaultdict(dict)
348 self
.__entities
_lock
= threading
.Lock()
353 self
.__schema
_cache
= {}
355 self
.__id
_counters
= {datastore_stub_util
.SEQUENTIAL
: 1L,
356 datastore_stub_util
.SCATTERED
: 1L
358 self
.__id
_lock
= threading
.Lock()
360 self
.__file
_lock
= threading
.Lock()
362 datastore_stub_util
.BaseDatastore
.__init
__(
363 self
, require_indexes
, consistency_policy
,
364 use_atexit
and self
.__IsSaveable
(), auto_id_policy
)
365 apiproxy_stub
.APIProxyStub
.__init
__(self
, service_name
)
366 datastore_stub_util
.DatastoreStub
.__init
__(self
, weakref
.proxy(self
),
367 app_id
, trusted
, root_path
)
370 self
._RegisterPseudoKind
(KindPseudoKind())
371 self
._RegisterPseudoKind
(PropertyPseudoKind())
372 self
._RegisterPseudoKind
(NamespacePseudoKind())
373 self
._RegisterPseudoKind
(datastore_stub_util
.EntityGroupPseudoKind())
378 """ Clears the datastore by deleting all currently stored entities and
380 self
.__entities
_lock
.acquire()
382 datastore_stub_util
.BaseDatastore
.Clear(self
)
383 datastore_stub_util
.DatastoreStub
.Clear(self
)
385 self
.__entities
_by
_kind
= collections
.defaultdict(dict)
386 self
.__entities
_by
_group
= collections
.defaultdict(dict)
387 self
.__schema
_cache
= {}
389 self
.__entities
_lock
.release()
391 def _GetAllEntities(self
):
395 Map from kind to _StoredEntity() list. Do not modify directly.
397 return self
.__entities
_by
_kind
399 def _GetEntityLocation(self
, key
):
400 """Get keys to self.__entities_by_* from the given key.
403 app_kind, eg_k, k = self._GetEntityLocation(key)
404 self.__entities_by_kind[app_kind][k]
405 self.__entities_by_entity_group[eg_k][k]
408 key: entity_pb.Reference
411 Tuple (by_kind key, by_entity_group key, entity key)
413 app_ns
= datastore_types
.EncodeAppIdNamespace(key
.app(), key
.name_space())
414 kind
= _FinalElement(key
).type()
415 entity_group
= datastore_stub_util
._GetEntityGroup
(key
)
416 eg_k
= datastore_types
.ReferenceToKeyValue(entity_group
)
417 k
= datastore_types
.ReferenceToKeyValue(key
)
419 return ((app_ns
, kind
), eg_k
, k
)
421 def _StoreEntity(self
, entity
, insert
=False):
422 """ Store the given entity.
424 Any needed locking should be managed by the caller.
427 entity: The entity_pb.EntityProto to store.
428 insert: If we should check for existence.
430 app_kind
, eg_k
, k
= self
._GetEntityLocation
(entity
.key())
432 assert not insert
or k
not in self
.__entities
_by
_kind
[app_kind
]
434 self
.__entities
_by
_kind
[app_kind
][k
] = _StoredEntity(entity
)
435 self
.__entities
_by
_group
[eg_k
][k
] = entity
438 if app_kind
in self
.__schema
_cache
:
439 del self
.__schema
_cache
[app_kind
]
441 READ_PB_EXCEPTIONS
= (ProtocolBuffer
.ProtocolBufferDecodeError
, LookupError,
442 TypeError, ValueError)
443 READ_ERROR_MSG
= ('Data in %s is corrupt or a different version. '
444 'Try running with the --clear_datastore flag.\n%r')
445 READ_PY250_MSG
= ('Are you using FloatProperty and/or GeoPtProperty? '
446 'Unfortunately loading float values from the datastore '
447 'file does not work with Python 2.5.0. '
448 'Please upgrade to a newer Python 2.5 release or use '
449 'the --clear_datastore flag.\n')
452 """ Reads the datastore and history files into memory.
454 The in-memory query history is cleared, but the datastore is *not*
455 cleared; the entities in the files are merged into the entities in memory.
456 If you want them to overwrite the in-memory datastore, call Clear() before
459 If the datastore file contains an entity with the same app name, kind, and
460 key as an entity already in the datastore, the entity from the file
461 overwrites the entity in the datastore.
463 Also sets each ID counter to one greater than the highest ID allocated so
464 far in that counter's ID space.
466 if self
.__datastore
_file
and self
.__datastore
_file
!= '/dev/null':
467 for encoded_entity
in self
.__ReadPickled
(self
.__datastore
_file
):
469 entity
= entity_pb
.EntityProto(encoded_entity
)
470 except self
.READ_PB_EXCEPTIONS
, e
:
471 raise apiproxy_errors
.ApplicationError(
472 datastore_pb
.Error
.INTERNAL_ERROR
,
473 self
.READ_ERROR_MSG
% (self
.__datastore
_file
, e
))
474 except struct
.error
, e
:
475 if (sys
.version_info
[0:3] == (2, 5, 0)
476 and e
.message
.startswith('unpack requires a string argument')):
479 raise apiproxy_errors
.ApplicationError(
480 datastore_pb
.Error
.INTERNAL_ERROR
,
481 self
.READ_PY250_MSG
+ self
.READ_ERROR_MSG
%
482 (self
.__datastore
_file
, e
))
486 self
._StoreEntity
(entity
)
488 last_path
= _FinalElement(entity
.key())
490 self
._SetMaxId
(last_path
.id())
493 """Writes out the datastore and history files.
495 Be careful! If the files already exist, this method overwrites them!
497 super(DatastoreFileStub
, self
).Write()
498 self
.__WriteDatastore
()
500 def __IsSaveable(self
):
501 return (self
.__datastore
_file
and self
.__datastore
_file
!= '/dev/null' and
504 def __WriteDatastore(self
):
505 """ Writes out the datastore file. Be careful! If the file already exists,
506 this method overwrites it!
508 if self
.__IsSaveable
():
510 for kind_dict
in self
.__entities
_by
_kind
.values():
511 encoded
.extend(entity
.encoded_protobuf
for entity
in kind_dict
.values())
513 self
.__WritePickled
(encoded
, self
.__datastore
_file
)
515 def __ReadPickled(self
, filename
):
516 """Reads a pickled object from the given file and returns it.
518 self
.__file
_lock
.acquire()
523 filename
!= '/dev/null' and
524 os
.path
.isfile(filename
) and
525 os
.stat(filename
).st_size
> 0):
526 return pickle
.load(open(filename
, 'rb'))
528 logging
.warning('Could not read datastore data from %s', filename
)
529 except (AttributeError, LookupError, ImportError, NameError, TypeError,
530 ValueError, struct
.error
, pickle
.PickleError
), e
:
533 raise apiproxy_errors
.ApplicationError(
534 datastore_pb
.Error
.INTERNAL_ERROR
,
535 'Could not read data from %s. Try running with the '
536 '--clear_datastore flag. Cause:\n%r' % (filename
, e
))
538 self
.__file
_lock
.release()
542 def __WritePickled(self
, obj
, filename
):
543 """Pickles the object and writes it to the given file.
545 if not filename
or filename
== '/dev/null' or not obj
:
549 descriptor
, tmp_filename
= tempfile
.mkstemp(dir=os
.path
.dirname(filename
))
550 tmpfile
= os
.fdopen(descriptor
, 'wb')
556 pickler
= pickle
.Pickler(tmpfile
, protocol
=1)
562 self
.__file
_lock
.acquire()
566 os
.rename(tmp_filename
, filename
)
573 os
.rename(tmp_filename
, filename
)
575 self
.__file
_lock
.release()
577 def MakeSyncCall(self
, service
, call
, request
, response
, request_id
=None):
578 """ The main RPC entry point. service must be 'datastore_v3'."""
579 self
.assertPbIsInitialized(request
)
580 super(DatastoreFileStub
, self
).MakeSyncCall(service
,
585 self
.assertPbIsInitialized(response
)
587 def assertPbIsInitialized(self
, pb
):
588 """Raises an exception if the given PB is not initialized and valid."""
590 assert pb
.IsInitialized(explanation
), explanation
594 def _GetSchemaCache(self
, kind
, usekey
):
595 if kind
in self
.__schema
_cache
and usekey
in self
.__schema
_cache
[kind
]:
596 return self
.__schema
_cache
[kind
][usekey
]
600 def _SetSchemaCache(self
, kind
, usekey
, value
):
601 if kind
not in self
.__schema
_cache
:
602 self
.__schema
_cache
[kind
] = {}
603 self
.__schema
_cache
[kind
][usekey
] = value
607 def _Put(self
, entity
, insert
):
608 entity
= datastore_stub_util
.StoreEntity(entity
)
610 self
.__entities
_lock
.acquire()
612 self
._StoreEntity
(entity
, insert
)
614 self
.__entities
_lock
.release()
617 app_kind
, _
, k
= self
._GetEntityLocation
(key
)
621 return datastore_stub_util
.LoadEntity(
622 self
.__entities
_by
_kind
[app_kind
][k
].protobuf
)
626 def _Delete(self
, key
):
627 app_kind
, eg_k
, k
= self
._GetEntityLocation
(key
)
629 self
.__entities
_lock
.acquire()
631 del self
.__entities
_by
_kind
[app_kind
][k
]
632 del self
.__entities
_by
_group
[eg_k
][k
]
633 if not self
.__entities
_by
_kind
[app_kind
]:
635 del self
.__entities
_by
_kind
[app_kind
]
636 if not self
.__entities
_by
_group
[eg_k
]:
637 del self
.__entities
_by
_group
[eg_k
]
639 del self
.__schema
_cache
[app_kind
]
644 self
.__entities
_lock
.release()
646 def _GetEntitiesInEntityGroup(self
, entity_group
):
647 eg_k
= datastore_types
.ReferenceToKeyValue(entity_group
)
648 return self
.__entities
_by
_group
[eg_k
].copy()
650 def _GetQueryCursor(self
, query
, filters
, orders
, index_list
,
651 filter_predicate
=None):
653 namespace
= query
.name_space()
656 if query
.has_kind() and query
.kind() in self
._pseudo
_kinds
:
657 pseudo_kind
= self
._pseudo
_kinds
[query
.kind()]
662 self
.__entities
_lock
.acquire()
664 app_ns
= datastore_types
.EncodeAppIdNamespace(app_id
, namespace
)
667 (results
, filters
, orders
) = pseudo_kind
.Query(query
, filters
, orders
)
668 elif query
.has_kind():
669 results
= [entity
.protobuf
for entity
in
670 self
.__entities
_by
_kind
[app_ns
, query
.kind()].values()]
673 for (cur_app_ns
, _
), entities
in self
.__entities
_by
_kind
.iteritems():
674 if cur_app_ns
== app_ns
:
675 results
.extend(entity
.protobuf
for entity
in entities
.itervalues())
679 self
.__entities
_lock
.release()
681 return datastore_stub_util
._ExecuteQuery
(results
, query
, filters
, orders
,
682 index_list
, filter_predicate
)
684 def _SetIdCounter(self
, id_space
, value
):
685 """Set the ID counter for id_space to value."""
686 self
.__id
_counters
[id_space
] = value
688 def _IdCounter(self
, id_space
):
689 """Return current value of ID counter for id_space."""
690 return self
.__id
_counters
[id_space
]
692 def _SetMaxId(self
, max_id
):
693 """Infer ID space and advance corresponding counter."""
694 count
, id_space
= datastore_stub_util
.IdToCounter(max_id
)
695 if count
>= self
._IdCounter
(id_space
):
696 self
._SetIdCounter
(id_space
, count
+ 1)
698 def _AllocateSequentialIds(self
, reference
, size
=1, max_id
=None):
699 datastore_stub_util
.Check(not (size
and max_id
),
700 'Both size and max cannot be set.')
702 self
.__id
_lock
.acquire()
704 id_space
= datastore_stub_util
.SEQUENTIAL
705 start
= self
._IdCounter
(id_space
)
707 datastore_stub_util
.Check(size
> 0, 'Size must be greater than 0.')
708 self
._SetIdCounter
(id_space
, start
+ size
)
710 datastore_stub_util
.Check(max_id
>=0,
711 'Max must be greater than or equal to 0.')
712 self
._SetIdCounter
(id_space
, max(start
, max_id
+ 1))
713 end
= self
._IdCounter
(id_space
) - 1
715 self
.__id
_lock
.release()
719 def _AllocateIds(self
, keys
):
720 self
.__id
_lock
.acquire()
724 last_element
= _FinalElement(key
)
726 if last_element
.id() or last_element
.has_name():
727 for el
in key
.path().element_list():
729 self
._SetMaxId
(el
.id())
732 id_space
= datastore_stub_util
.SCATTERED
733 count
= self
._IdCounter
(id_space
)
734 last_element
.set_id(datastore_stub_util
.ToScatteredId(count
))
735 self
._SetIdCounter
(id_space
, count
+ 1)
736 full_keys
.append(key
)
738 self
.__id
_lock
.release()