App Engine Python SDK version 1.7.7
[gae.git] / python / google / appengine / ext / mapreduce / context.py
blob9135cdf1ebd66e478dbaa0739a9786ae558ba7ed
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Mapreduce execution context.
35 Mapreduce context provides handler code with information about
36 current mapreduce execution and organizes utility data flow
37 from handlers such as counters, log messages, mutation pools.
38 """
41 __all__ = [
42 "get",
43 "Context",
44 "Counters",
45 "EntityList",
46 "ItemList",
47 "MutationPool",
48 "COUNTER_MAPPER_CALLS",
49 "COUNTER_MAPPER_WALLTIME_MS",
50 "DATASTORE_DEADLINE",
51 "MAX_ENTITY_COUNT",
52 "MAX_POOL_SIZE",
55 import threading
57 from google.appengine.api import datastore
58 from google.appengine.ext import db
60 try:
61 from google.appengine.ext import ndb
62 except ImportError:
63 ndb = None
64 # It is acceptable to set key_range.ndb to the ndb module,
65 # imported through some other way (e.g. from the app dir).
71 MAX_POOL_SIZE = 900 * 1000
74 MAX_ENTITY_COUNT = 500
77 DATASTORE_DEADLINE = 15
80 COUNTER_MAPPER_CALLS = "mapper-calls"
84 COUNTER_MAPPER_WALLTIME_MS = "mapper-walltime-ms"
87 def _normalize_entity(value):
88 """Return an entity from an entity or model instance."""
89 if ndb is not None and isinstance(value, ndb.Model):
90 return None
91 if getattr(value, "_populate_internal_entity", None):
92 return value._populate_internal_entity()
93 return value
95 def _normalize_key(value):
96 """Return a key from an entity, model instance, key, or key string."""
97 if ndb is not None and isinstance(value, (ndb.Model, ndb.Key)):
98 return None
99 if getattr(value, "key", None):
100 return value.key()
101 elif isinstance(value, basestring):
102 return datastore.Key(value)
103 else:
104 return value
106 class ItemList(object):
107 """Holds list of arbitrary items, and their total size.
109 Properties:
110 items: list of objects.
111 length: length of item list.
112 size: aggregate item size in bytes.
115 def __init__(self):
116 """Constructor."""
117 self.items = []
118 self.length = 0
119 self.size = 0
121 def append(self, item, item_size):
122 """Add new item to the list.
124 Args:
125 item: an item to add to the list.
126 item_size: item size in bytes as int.
128 self.items.append(item)
129 self.length += 1
130 self.size += item_size
132 def clear(self):
133 """Clear item list."""
134 self.items = []
135 self.length = 0
136 self.size = 0
138 @property
139 def entities(self):
140 """Return items. For backwards compatability."""
141 return self.items
145 EntityList = ItemList
149 class MutationPool(object):
150 """Mutation pool accumulates datastore changes to perform them in batch.
152 Properties:
153 puts: ItemList of entities to put to datastore.
154 deletes: ItemList of keys to delete from datastore.
155 max_pool_size: maximum single list pool size. List changes will be flushed
156 when this size is reached.
159 def __init__(self,
160 max_pool_size=MAX_POOL_SIZE,
161 max_entity_count=MAX_ENTITY_COUNT,
162 mapreduce_spec=None):
163 """Constructor.
165 Args:
166 max_pool_size: maximum pools size in bytes before flushing it to db.
167 max_entity_count: maximum number of entities before flushing it to db.
168 mapreduce_spec: An optional instance of MapperSpec.
170 self.max_pool_size = max_pool_size
171 self.max_entity_count = max_entity_count
172 params = mapreduce_spec.params if mapreduce_spec is not None else {}
173 self.force_writes = bool(params.get("force_ops_writes", False))
174 self.puts = ItemList()
175 self.deletes = ItemList()
176 self.ndb_puts = ItemList()
177 self.ndb_deletes = ItemList()
179 def put(self, entity):
180 """Registers entity to put to datastore.
182 Args:
183 entity: an entity or model instance to put.
185 actual_entity = _normalize_entity(entity)
186 if actual_entity is None:
187 return self.ndb_put(entity)
188 entity_size = len(actual_entity._ToPb().Encode())
189 if (self.puts.length >= self.max_entity_count or
190 (self.puts.size + entity_size) > self.max_pool_size):
191 self.__flush_puts()
192 self.puts.append(actual_entity, entity_size)
194 def ndb_put(self, entity):
195 """Like put(), but for NDB entities."""
196 assert ndb is not None and isinstance(entity, ndb.Model)
197 entity_size = len(entity._to_pb().Encode())
198 if (self.ndb_puts.length >= self.max_entity_count or
199 (self.ndb_puts.size + entity_size) > self.max_pool_size):
200 self.__flush_ndb_puts()
201 self.ndb_puts.append(entity, entity_size)
203 def delete(self, entity):
204 """Registers entity to delete from datastore.
206 Args:
207 entity: an entity, model instance, or key to delete.
210 key = _normalize_key(entity)
211 if key is None:
212 return self.ndb_delete(entity)
213 key_size = len(key._ToPb().Encode())
214 if (self.deletes.length >= self.max_entity_count or
215 (self.deletes.size + key_size) > self.max_pool_size):
216 self.__flush_deletes()
217 self.deletes.append(key, key_size)
219 def ndb_delete(self, entity_or_key):
220 """Like delete(), but for NDB entities/keys."""
221 if isinstance(entity_or_key, ndb.Model):
222 key = entity_or_key.key
223 else:
224 key = entity_or_key
225 key_size = len(key.reference().Encode())
226 if (self.ndb_deletes.length >= self.max_entity_count or
227 (self.ndb_deletes.size + key_size) > self.max_pool_size):
228 self.__flush_ndb_deletes()
229 self.ndb_deletes.append(key, key_size)
232 def flush(self):
233 """Flush(apply) all changed to datastore."""
234 self.__flush_puts()
235 self.__flush_deletes()
236 self.__flush_ndb_puts()
237 self.__flush_ndb_deletes()
239 def __flush_puts(self):
240 """Flush all puts to datastore."""
241 if self.puts.length:
242 datastore.Put(self.puts.items, config=self.__create_config())
243 self.puts.clear()
245 def __flush_deletes(self):
246 """Flush all deletes to datastore."""
247 if self.deletes.length:
248 datastore.Delete(self.deletes.items, config=self.__create_config())
249 self.deletes.clear()
251 def __flush_ndb_puts(self):
252 """Flush all NDB puts to datastore."""
253 if self.ndb_puts.length:
254 ndb.put_multi(self.ndb_puts.items, config=self.__create_config())
255 self.ndb_puts.clear()
257 def __flush_ndb_deletes(self):
258 """Flush all deletes to datastore."""
259 if self.ndb_deletes.length:
260 ndb.delete_multi(self.ndb_deletes.items, config=self.__create_config())
261 self.ndb_deletes.clear()
263 def __create_config(self):
264 """Creates datastore Config.
266 Returns:
267 A datastore_rpc.Configuration instance.
269 return datastore.CreateConfig(deadline=DATASTORE_DEADLINE,
270 force_writes=self.force_writes)
275 class Counters(object):
276 """Regulates access to counters."""
278 def __init__(self, shard_state):
279 """Constructor.
281 Args:
282 shard_state: current mapreduce shard state as model.ShardState.
284 self._shard_state = shard_state
286 def increment(self, counter_name, delta=1):
287 """Increment counter value.
289 Args:
290 counter_name: name of the counter as string.
291 delta: increment delta as int.
293 self._shard_state.counters_map.increment(counter_name, delta)
295 def flush(self):
296 """Flush unsaved counter values."""
297 pass
300 class Context(object):
301 """MapReduce execution context.
303 Properties:
304 mapreduce_spec: current mapreduce specification as model.MapreduceSpec.
305 shard_state: current shard state as model.ShardState.
306 mutation_pool: current mutation pool as MutationPool.
307 counters: counters object as Counters.
311 _local = threading.local()
313 def __init__(self, mapreduce_spec, shard_state, task_retry_count=0):
314 """Constructor.
316 Args:
317 mapreduce_spec: mapreduce specification as model.MapreduceSpec.
318 shard_state: shard state as model.ShardState.
320 self.mapreduce_spec = mapreduce_spec
321 self.shard_state = shard_state
322 self.task_retry_count = task_retry_count
324 if self.mapreduce_spec:
325 self.mapreduce_id = self.mapreduce_spec.mapreduce_id
326 else:
328 self.mapreduce_id = None
329 if self.shard_state:
330 self.shard_id = self.shard_state.get_shard_id()
331 else:
333 self.shard_id = None
335 self.mutation_pool = MutationPool(
336 max_pool_size=(MAX_POOL_SIZE/(2**self.task_retry_count)),
337 max_entity_count=(MAX_ENTITY_COUNT/(2**self.task_retry_count)),
338 mapreduce_spec=mapreduce_spec)
339 self.counters = Counters(shard_state)
341 self._pools = {}
342 self.register_pool("mutation_pool", self.mutation_pool)
343 self.register_pool("counters", self.counters)
345 def flush(self):
346 """Flush all information recorded in context."""
347 for pool in self._pools.values():
348 pool.flush()
356 def register_pool(self, key, pool):
357 """Register an arbitrary pool to be flushed together with this context.
359 Args:
360 key: pool key as string.
361 pool: a pool instance. Pool should implement flush(self) method.
363 self._pools[key] = pool
365 def get_pool(self, key):
366 """Obtains an instance of registered pool.
368 Args:
369 key: pool key as string.
371 Returns:
372 an instance of the pool registered earlier, or None.
374 return self._pools.get(key, None)
376 @classmethod
377 def _set(cls, context):
378 """Set current context instance.
380 Args:
381 context: new context as Context or None.
383 cls._local._context_instance = context
386 def get():
387 """Get current context instance.
389 Returns:
390 current context as Context.
392 if not hasattr(Context._local, '_context_instance') :
393 return None
394 return Context._local._context_instance