1.9.30 sync.
[gae.git] / python / google / appengine / ext / mapreduce / parameters.py
blobc041c6599285553f987717a159e3a6f3ce7a1fc6
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Parameters to control Mapreduce."""
19 __all__ = ["CONFIG_NAMESPACE",
20 "config"]
22 import pickle
24 import google
32 try:
33 from appengine_pipeline.src.pipeline import util as pipeline_util
34 except ImportError:
35 pipeline_util = None
37 from google.appengine.api import lib_config
39 CONFIG_NAMESPACE = "mapreduce"
46 class _JobConfigMeta(type):
47 """Metaclass that controls class creation."""
49 _OPTIONS = "_options"
50 _REQUIRED = "_required"
52 def __new__(mcs, classname, bases, class_dict):
53 """Creates a _Config class and modifies its class dict.
55 Args:
56 classname: name of the class.
57 bases: a list of base classes.
58 class_dict: original class dict.
60 Returns:
61 A new _Config class. The modified class will have two fields.
62 _options field is a dict from option name to _Option objects.
63 _required field is a set of required option names.
64 """
65 options = {}
66 required = set()
67 for name, option in class_dict.iteritems():
68 if isinstance(option, _Option):
69 options[name] = option
70 if option.required:
71 required.add(name)
73 for name in options:
74 class_dict.pop(name)
75 class_dict[mcs._OPTIONS] = options
76 class_dict[mcs._REQUIRED] = required
77 cls = type.__new__(mcs, classname, bases, class_dict)
80 if object not in bases:
81 parent_options = {}
83 for c in reversed(cls.__mro__):
84 if mcs._OPTIONS in c.__dict__:
86 parent_options.update(c.__dict__[mcs._OPTIONS])
87 if mcs._REQUIRED in c.__dict__:
88 required.update(c.__dict__[mcs._REQUIRED])
89 for k, v in parent_options.iteritems():
90 if k not in options:
91 options[k] = v
92 return cls
95 class _Option(object):
96 """An option for _Config."""
98 def __init__(self, kind, required=False, default_factory=None,
99 can_be_none=False):
100 """Init.
102 Args:
103 kind: type of the option.
104 required: whether user is required to supply a value.
105 default_factory: a factory, when called, returns the default value.
106 can_be_none: whether value can be None.
108 Raises:
109 ValueError: if arguments aren't compatible.
111 if required and default_factory is not None:
112 raise ValueError("No default_factory value when option is required.")
113 self.kind = kind
114 self.required = required
115 self.default_factory = default_factory
116 self.can_be_none = can_be_none
119 class _Config(object):
120 """Root class for all per job configuration."""
122 __metaclass__ = _JobConfigMeta
124 def __init__(self, _lenient=False, **kwds):
125 """Init.
127 Args:
128 _lenient: When true, no option is required.
129 **kwds: keyword arguments for options and their values.
131 self._verify_keys(kwds, _lenient)
132 self._set_values(kwds, _lenient)
134 def _verify_keys(self, kwds, _lenient):
135 keys = set()
136 for k in kwds:
137 if k not in self._options:
138 raise ValueError("Option %s is not supported." % (k))
139 keys.add(k)
140 if not _lenient:
141 missing = self._required - keys
142 if missing:
143 raise ValueError("Options %s are required." % tuple(missing))
145 def _set_values(self, kwds, _lenient):
146 for k, option in self._options.iteritems():
147 v = kwds.get(k)
148 if v is None and option.default_factory:
149 v = option.default_factory()
150 setattr(self, k, v)
151 if _lenient:
152 continue
153 if v is None and option.can_be_none:
154 continue
155 if isinstance(v, type) and not issubclass(v, option.kind):
156 raise TypeError(
157 "Expect subclass of %r for option %s. Got %r" % (
158 option.kind, k, v))
159 if not isinstance(v, type) and not isinstance(v, option.kind):
160 raise TypeError("Expect type %r for option %s. Got %r" % (
161 option.kind, k, v))
163 def __eq__(self, other):
164 if not isinstance(other, self.__class__):
165 return False
166 return other.__dict__ == self.__dict__
168 def __repr__(self):
169 return str(self.__dict__)
171 def to_json(self):
172 return {"config": pickle.dumps(self)}
174 @classmethod
175 def from_json(cls, json):
176 return pickle.loads(json["config"])
180 class _ConfigDefaults(object):
181 """Default configs.
183 Do not change parameters whose names begin with _.
185 SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up.
187 TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task
188 is any taskqueue task created by MR framework. A task is dropped
189 when its X-AppEngine-TaskExecutionCount is bigger than this number.
190 Dropping a task will cause abort on the entire MR job.
192 TASK_MAX_DATA_PROCESSING_ATTEMPTS:
193 Max times to execute a task when previous task attempts failed during
194 data processing stage. An MR work task has three major stages:
195 initial setup, data processing, and final checkpoint.
196 Setup stage should be allowed to be retried more times than data processing
197 stage: setup failures are caused by unavailable GAE services while
198 data processing failures are mostly due to user function error out on
199 certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter.
201 QUEUE_NAME: Default queue for MR.
203 SHARD_COUNT: Default shard count.
205 PROCESSING_RATE_PER_SEC: Default rate of processed entities per second.
207 BASE_PATH : Base path of mapreduce and pipeline handlers.
210 SHARD_MAX_ATTEMPTS = 4
213 TASK_MAX_ATTEMPTS = 31
215 TASK_MAX_DATA_PROCESSING_ATTEMPTS = 11
217 QUEUE_NAME = "default"
219 SHARD_COUNT = 8
225 PROCESSING_RATE_PER_SEC = 1000000
228 BASE_PATH = "/_ah/mapreduce"
233 _SLICE_DURATION_SEC = 15
236 _CONTROLLER_PERIOD_SEC = 2
240 config = lib_config.register(CONFIG_NAMESPACE, _ConfigDefaults.__dict__)
246 _DEFAULT_PIPELINE_BASE_PATH = config.BASE_PATH + "/pipeline"
248 _GCS_URLFETCH_TIMEOUT_SEC = 30
251 _LEASE_DURATION_SEC = config._SLICE_DURATION_SEC * 1.1
256 _MAX_LEASE_DURATION_SEC = max(10 * 60 + 30, config._SLICE_DURATION_SEC * 1.5)