3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Mapreduce execution context.
35 Mapreduce context provides handler code with information about
36 current mapreduce execution and organizes utility data flow
37 from handlers such as counters, log messages, mutation pools.
48 "COUNTER_MAPPER_CALLS",
49 "COUNTER_MAPPER_WALLTIME_MS",
57 from google
.appengine
.api
import datastore
58 from google
.appengine
.ext
import db
61 from google
.appengine
.ext
import ndb
64 # It is acceptable to set key_range.ndb to the ndb module,
65 # imported through some other way (e.g. from the app dir).
71 MAX_POOL_SIZE
= 900 * 1000
74 MAX_ENTITY_COUNT
= 500
77 DATASTORE_DEADLINE
= 15
80 COUNTER_MAPPER_CALLS
= "mapper-calls"
84 COUNTER_MAPPER_WALLTIME_MS
= "mapper-walltime-ms"
87 def _normalize_entity(value
):
88 """Return an entity from an entity or model instance."""
89 if ndb
is not None and isinstance(value
, ndb
.Model
):
91 if getattr(value
, "_populate_internal_entity", None):
92 return value
._populate
_internal
_entity
()
95 def _normalize_key(value
):
96 """Return a key from an entity, model instance, key, or key string."""
97 if ndb
is not None and isinstance(value
, (ndb
.Model
, ndb
.Key
)):
99 if getattr(value
, "key", None):
101 elif isinstance(value
, basestring
):
102 return datastore
.Key(value
)
106 class ItemList(object):
107 """Holds list of arbitrary items, and their total size.
110 items: list of objects.
111 length: length of item list.
112 size: aggregate item size in bytes.
121 def append(self
, item
, item_size
):
122 """Add new item to the list.
125 item: an item to add to the list.
126 item_size: item size in bytes as int.
128 self
.items
.append(item
)
130 self
.size
+= item_size
133 """Clear item list."""
140 """Return items. For backwards compatability."""
145 EntityList
= ItemList
149 class MutationPool(object):
150 """Mutation pool accumulates datastore changes to perform them in batch.
153 puts: ItemList of entities to put to datastore.
154 deletes: ItemList of keys to delete from datastore.
155 max_pool_size: maximum single list pool size. List changes will be flushed
156 when this size is reached.
160 max_pool_size
=MAX_POOL_SIZE
,
161 max_entity_count
=MAX_ENTITY_COUNT
,
162 mapreduce_spec
=None):
166 max_pool_size: maximum pools size in bytes before flushing it to db.
167 max_entity_count: maximum number of entities before flushing it to db.
168 mapreduce_spec: An optional instance of MapperSpec.
170 self
.max_pool_size
= max_pool_size
171 self
.max_entity_count
= max_entity_count
172 params
= mapreduce_spec
.params
if mapreduce_spec
is not None else {}
173 self
.force_writes
= bool(params
.get("force_ops_writes", False))
174 self
.puts
= ItemList()
175 self
.deletes
= ItemList()
176 self
.ndb_puts
= ItemList()
177 self
.ndb_deletes
= ItemList()
179 def put(self
, entity
):
180 """Registers entity to put to datastore.
183 entity: an entity or model instance to put.
185 actual_entity
= _normalize_entity(entity
)
186 if actual_entity
is None:
187 return self
.ndb_put(entity
)
188 entity_size
= len(actual_entity
._ToPb
().Encode())
189 if (self
.puts
.length
>= self
.max_entity_count
or
190 (self
.puts
.size
+ entity_size
) > self
.max_pool_size
):
192 self
.puts
.append(actual_entity
, entity_size
)
194 def ndb_put(self
, entity
):
195 """Like put(), but for NDB entities."""
196 assert ndb
is not None and isinstance(entity
, ndb
.Model
)
197 entity_size
= len(entity
._to
_pb
().Encode())
198 if (self
.ndb_puts
.length
>= self
.max_entity_count
or
199 (self
.ndb_puts
.size
+ entity_size
) > self
.max_pool_size
):
200 self
.__flush
_ndb
_puts
()
201 self
.ndb_puts
.append(entity
, entity_size
)
203 def delete(self
, entity
):
204 """Registers entity to delete from datastore.
207 entity: an entity, model instance, or key to delete.
210 key
= _normalize_key(entity
)
212 return self
.ndb_delete(entity
)
213 key_size
= len(key
._ToPb
().Encode())
214 if (self
.deletes
.length
>= self
.max_entity_count
or
215 (self
.deletes
.size
+ key_size
) > self
.max_pool_size
):
216 self
.__flush
_deletes
()
217 self
.deletes
.append(key
, key_size
)
219 def ndb_delete(self
, entity_or_key
):
220 """Like delete(), but for NDB entities/keys."""
221 if isinstance(entity_or_key
, ndb
.Model
):
222 key
= entity_or_key
.key
225 key_size
= len(key
.reference().Encode())
226 if (self
.ndb_deletes
.length
>= self
.max_entity_count
or
227 (self
.ndb_deletes
.size
+ key_size
) > self
.max_pool_size
):
228 self
.__flush
_ndb
_deletes
()
229 self
.ndb_deletes
.append(key
, key_size
)
233 """Flush(apply) all changed to datastore."""
235 self
.__flush
_deletes
()
236 self
.__flush
_ndb
_puts
()
237 self
.__flush
_ndb
_deletes
()
239 def __flush_puts(self
):
240 """Flush all puts to datastore."""
242 datastore
.Put(self
.puts
.items
, config
=self
.__create
_config
())
245 def __flush_deletes(self
):
246 """Flush all deletes to datastore."""
247 if self
.deletes
.length
:
248 datastore
.Delete(self
.deletes
.items
, config
=self
.__create
_config
())
251 def __flush_ndb_puts(self
):
252 """Flush all NDB puts to datastore."""
253 if self
.ndb_puts
.length
:
254 ndb
.put_multi(self
.ndb_puts
.items
, config
=self
.__create
_config
())
255 self
.ndb_puts
.clear()
257 def __flush_ndb_deletes(self
):
258 """Flush all deletes to datastore."""
259 if self
.ndb_deletes
.length
:
260 ndb
.delete_multi(self
.ndb_deletes
.items
, config
=self
.__create
_config
())
261 self
.ndb_deletes
.clear()
263 def __create_config(self
):
264 """Creates datastore Config.
267 A datastore_rpc.Configuration instance.
269 return datastore
.CreateConfig(deadline
=DATASTORE_DEADLINE
,
270 force_writes
=self
.force_writes
)
275 class Counters(object):
276 """Regulates access to counters."""
278 def __init__(self
, shard_state
):
282 shard_state: current mapreduce shard state as model.ShardState.
284 self
._shard
_state
= shard_state
286 def increment(self
, counter_name
, delta
=1):
287 """Increment counter value.
290 counter_name: name of the counter as string.
291 delta: increment delta as int.
293 self
._shard
_state
.counters_map
.increment(counter_name
, delta
)
296 """Flush unsaved counter values."""
300 class Context(object):
301 """MapReduce execution context.
304 mapreduce_spec: current mapreduce specification as model.MapreduceSpec.
305 shard_state: current shard state as model.ShardState.
306 mutation_pool: current mutation pool as MutationPool.
307 counters: counters object as Counters.
311 _local
= threading
.local()
313 def __init__(self
, mapreduce_spec
, shard_state
, task_retry_count
=0):
317 mapreduce_spec: mapreduce specification as model.MapreduceSpec.
318 shard_state: shard state as model.ShardState.
320 self
.mapreduce_spec
= mapreduce_spec
321 self
.shard_state
= shard_state
322 self
.task_retry_count
= task_retry_count
324 if self
.mapreduce_spec
:
325 self
.mapreduce_id
= self
.mapreduce_spec
.mapreduce_id
328 self
.mapreduce_id
= None
330 self
.shard_id
= self
.shard_state
.get_shard_id()
335 self
.mutation_pool
= MutationPool(
336 max_pool_size
=(MAX_POOL_SIZE
/(2**self
.task_retry_count
)),
337 max_entity_count
=(MAX_ENTITY_COUNT
/(2**self
.task_retry_count
)),
338 mapreduce_spec
=mapreduce_spec
)
339 self
.counters
= Counters(shard_state
)
342 self
.register_pool("mutation_pool", self
.mutation_pool
)
343 self
.register_pool("counters", self
.counters
)
346 """Flush all information recorded in context."""
347 for pool
in self
._pools
.values():
356 def register_pool(self
, key
, pool
):
357 """Register an arbitrary pool to be flushed together with this context.
360 key: pool key as string.
361 pool: a pool instance. Pool should implement flush(self) method.
363 self
._pools
[key
] = pool
365 def get_pool(self
, key
):
366 """Obtains an instance of registered pool.
369 key: pool key as string.
372 an instance of the pool registered earlier, or None.
374 return self
._pools
.get(key
, None)
377 def _set(cls
, context
):
378 """Set current context instance.
381 context: new context as Context or None.
383 cls
._local
._context
_instance
= context
387 """Get current context instance.
390 current context as Context.
392 if not hasattr(Context
._local
, '_context_instance') :
394 return Context
._local
._context
_instance