3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Parameters to control Mapreduce."""
19 __all__
= ["CONFIG_NAMESPACE",
33 from appengine_pipeline
.src
.pipeline
import util
as pipeline_util
37 from google
.appengine
.api
import lib_config
39 CONFIG_NAMESPACE
= "mapreduce"
46 class _JobConfigMeta(type):
47 """Metaclass that controls class creation."""
50 _REQUIRED
= "_required"
52 def __new__(mcs
, classname
, bases
, class_dict
):
53 """Creates a _Config class and modifies its class dict.
56 classname: name of the class.
57 bases: a list of base classes.
58 class_dict: original class dict.
61 A new _Config class. The modified class will have two fields.
62 _options field is a dict from option name to _Option objects.
63 _required field is a set of required option names.
67 for name
, option
in class_dict
.iteritems():
68 if isinstance(option
, _Option
):
69 options
[name
] = option
75 class_dict
[mcs
._OPTIONS
] = options
76 class_dict
[mcs
._REQUIRED
] = required
77 cls
= type.__new
__(mcs
, classname
, bases
, class_dict
)
80 if object not in bases
:
83 for c
in reversed(cls
.__mro
__):
84 if mcs
._OPTIONS
in c
.__dict
__:
86 parent_options
.update(c
.__dict
__[mcs
._OPTIONS
])
87 if mcs
._REQUIRED
in c
.__dict
__:
88 required
.update(c
.__dict
__[mcs
._REQUIRED
])
89 for k
, v
in parent_options
.iteritems():
95 class _Option(object):
96 """An option for _Config."""
98 def __init__(self
, kind
, required
=False, default_factory
=None,
103 kind: type of the option.
104 required: whether user is required to supply a value.
105 default_factory: a factory, when called, returns the default value.
106 can_be_none: whether value can be None.
109 ValueError: if arguments aren't compatible.
111 if required
and default_factory
is not None:
112 raise ValueError("No default_factory value when option is required.")
114 self
.required
= required
115 self
.default_factory
= default_factory
116 self
.can_be_none
= can_be_none
119 class _Config(object):
120 """Root class for all per job configuration."""
122 __metaclass__
= _JobConfigMeta
124 def __init__(self
, _lenient
=False, **kwds
):
128 _lenient: When true, no option is required.
129 **kwds: keyword arguments for options and their values.
131 self
._verify
_keys
(kwds
, _lenient
)
132 self
._set
_values
(kwds
, _lenient
)
134 def _verify_keys(self
, kwds
, _lenient
):
137 if k
not in self
._options
:
138 raise ValueError("Option %s is not supported." % (k
))
141 missing
= self
._required
- keys
143 raise ValueError("Options %s are required." % tuple(missing
))
145 def _set_values(self
, kwds
, _lenient
):
146 for k
, option
in self
._options
.iteritems():
148 if v
is None and option
.default_factory
:
149 v
= option
.default_factory()
153 if v
is None and option
.can_be_none
:
155 if isinstance(v
, type) and not issubclass(v
, option
.kind
):
157 "Expect subclass of %r for option %s. Got %r" % (
159 if not isinstance(v
, type) and not isinstance(v
, option
.kind
):
160 raise TypeError("Expect type %r for option %s. Got %r" % (
163 def __eq__(self
, other
):
164 if not isinstance(other
, self
.__class
__):
166 return other
.__dict
__ == self
.__dict
__
169 return str(self
.__dict
__)
172 return {"config": pickle
.dumps(self
)}
175 def from_json(cls
, json
):
176 return pickle
.loads(json
["config"])
180 class _ConfigDefaults(object):
183 Do not change parameters whose names begin with _.
185 SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up.
187 TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task
188 is any taskqueue task created by MR framework. A task is dropped
189 when its X-AppEngine-TaskExecutionCount is bigger than this number.
190 Dropping a task will cause abort on the entire MR job.
192 TASK_MAX_DATA_PROCESSING_ATTEMPTS:
193 Max times to execute a task when previous task attempts failed during
194 data processing stage. An MR work task has three major stages:
195 initial setup, data processing, and final checkpoint.
196 Setup stage should be allowed to be retried more times than data processing
197 stage: setup failures are caused by unavailable GAE services while
198 data processing failures are mostly due to user function error out on
199 certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter.
201 QUEUE_NAME: Default queue for MR.
203 SHARD_COUNT: Default shard count.
205 PROCESSING_RATE_PER_SEC: Default rate of processed entities per second.
207 BASE_PATH : Base path of mapreduce and pipeline handlers.
210 SHARD_MAX_ATTEMPTS
= 4
213 TASK_MAX_ATTEMPTS
= 31
215 TASK_MAX_DATA_PROCESSING_ATTEMPTS
= 11
217 QUEUE_NAME
= "default"
225 PROCESSING_RATE_PER_SEC
= 1000000
228 BASE_PATH
= "/_ah/mapreduce"
233 _SLICE_DURATION_SEC
= 15
236 _CONTROLLER_PERIOD_SEC
= 2
240 config
= lib_config
.register(CONFIG_NAMESPACE
, _ConfigDefaults
.__dict
__)
246 _DEFAULT_PIPELINE_BASE_PATH
= config
.BASE_PATH
+ "/pipeline"
248 _GCS_URLFETCH_TIMEOUT_SEC
= 30
251 _LEASE_DURATION_SEC
= config
._SLICE
_DURATION
_SEC
* 1.1
256 _MAX_LEASE_DURATION_SEC
= max(10 * 60 + 30, config
._SLICE
_DURATION
_SEC
* 1.5)