App Engine Python SDK version 1.8.8
[gae.git] / python / google / appengine / ext / mapreduce / control.py
blob42946cf9d5dab7e68ffb5ed3c9145408b56921e7
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """API for controlling MapReduce execution outside of MapReduce framework."""
36 __all__ = ["start_map"]
40 import logging
41 import google
43 from google.appengine.ext import db
44 from google.appengine.ext.mapreduce import handlers
45 from google.appengine.ext.mapreduce import model
46 from google.appengine.ext.mapreduce import parameters
47 from google.appengine.ext.mapreduce import util
50 def start_map(name,
51 handler_spec,
52 reader_spec,
53 mapper_parameters,
54 shard_count=None,
55 output_writer_spec=None,
56 mapreduce_parameters=None,
57 base_path=None,
58 queue_name=None,
59 eta=None,
60 countdown=None,
61 hooks_class_name=None,
62 _app=None,
63 in_xg_transaction=False):
64 """Start a new, mapper-only mapreduce.
66 Args:
67 name: mapreduce name. Used only for display purposes.
68 handler_spec: fully qualified name of mapper handler function/class to call.
69 reader_spec: fully qualified name of mapper reader to use
70 mapper_parameters: dictionary of parameters to pass to mapper. These are
71 mapper-specific and also used for reader initialization.
72 shard_count: number of shards to create.
73 mapreduce_parameters: dictionary of mapreduce parameters relevant to the
74 whole job.
75 base_path: base path of mapreduce library handler specified in app.yaml.
76 "/mapreduce" by default.
77 queue_name: taskqueue queue name to be used for mapreduce tasks.
78 see util.get_queue_name.
79 eta: absolute time when the MR should execute. May not be specified
80 if 'countdown' is also supplied. This may be timezone-aware or
81 timezone-naive.
82 countdown: time in seconds into the future that this MR should execute.
83 Defaults to zero.
84 hooks_class_name: fully qualified name of a hooks.Hooks subclass.
85 in_xg_transaction: controls what transaction scope to use to start this MR
86 job. If True, there has to be an already opened cross-group transaction
87 scope. MR will use one entity group from it.
88 If False, MR will create an independent transaction to start the job
89 regardless of any existing transaction scopes.
91 Returns:
92 mapreduce id as string.
93 """
94 if shard_count is None:
95 shard_count = parameters.config.SHARD_COUNT
96 if base_path is None:
97 base_path = parameters.config.BASE_PATH
99 if mapper_parameters:
100 mapper_parameters = dict(mapper_parameters)
101 if mapreduce_parameters:
102 mapreduce_parameters = dict(mapreduce_parameters)
103 if "base_path" not in mapreduce_parameters:
104 mapreduce_parameters["base_path"] = base_path
105 else:
106 mapreduce_parameters = {"base_path": base_path}
108 mapper_spec = model.MapperSpec(handler_spec,
109 reader_spec,
110 mapper_parameters,
111 shard_count,
112 output_writer_spec=output_writer_spec)
114 if in_xg_transaction and not db.is_in_transaction():
115 logging.warning("Expects an opened xg transaction to start mapreduce "
116 "when transactional is True.")
118 return handlers.StartJobHandler._start_map(
119 name,
120 mapper_spec,
121 mapreduce_parameters or {},
122 base_path=base_path,
123 queue_name=util.get_queue_name(queue_name),
124 eta=eta,
125 countdown=countdown,
126 hooks_class_name=hooks_class_name,
127 _app=_app,
128 in_xg_transaction=in_xg_transaction)