App Engine Python SDK version 1.7.7
[gae.git] / python / google / appengine / ext / mapreduce / file_format_root.py
blob193f5c4467fc86a6153868eab266484c8d048031
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Define file format root."""
33 from __future__ import with_statement
37 __all__ = ['FileFormatRoot',
38 'split']
40 import copy
41 import google.appengine.ext.mapreduce.file_format_parser as parser
43 from google.appengine.api.files import file as files
44 from google.appengine.ext.mapreduce import model
45 from google.appengine.ext.mapreduce import file_formats
48 def split(filenames, format_string, shards):
49 """Get a FileFormatRoot for each shard.
51 This method creates a list of FileFormatRoot and assigns each root
52 some input files. The number of roots is less than or equal to shards.
54 Args:
55 filenames: input filenames
56 format_string: format string from user.
57 shards: number of shards to split inputs.
59 Returns:
60 A list of FileFormatRoot or None if all input files have zero bytes.
61 """
62 parsed_formats = parser.parse(format_string)
64 sizes = [files.stat(filename).st_size for filename in filenames]
66 size_per_shard = float(sum(sizes)) / shards
67 if not size_per_shard:
68 return
70 if parsed_formats[0].can_split():
71 return _deep_split(filenames, size_per_shard, parsed_formats)
72 else:
73 return _shallow_split(filenames, size_per_shard, parsed_formats, sizes)
76 def _shallow_split(filenames, size_per_shard, parsed_formats, sizes):
77 """Split files into roots only based on top level file sizes.
79 This split does not cross file boundary.
80 """
81 roots = []
82 inputs = []
83 shard_size = 0
84 for i, size in enumerate(sizes):
85 shard_size += size
86 inputs.append(_FileRange(filenames[i], None))
87 if shard_size > size_per_shard:
88 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
89 inputs = []
90 shard_size = 0
92 if inputs:
93 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
95 return roots
98 def _deep_split(filenames, size_per_shard, parsed_formats):
99 """Split files into roots using the first FileFormat.
101 Deep split can split within a file. It tells the first format how big
102 a split it wants and the first format will do the actually splitting
103 because only the first format knows how to operate on this particular
104 format.
106 Args:
107 filenames: a list of input filenames.
108 size_per_shard: size per shard.
109 parsed_format: the parsed FileFormats.
111 Returns:
112 A list of FileFormatRoot.
114 roots = []
115 inputs = []
116 size_left = size_per_shard
118 for filename in filenames:
119 index = 0
120 with files.open(filename) as f:
121 cache_for_split = {}
123 while True:
124 if size_left <= 0:
126 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
127 size_left = size_per_shard
128 inputs = []
129 start_index = index
130 size_left, index = parsed_formats[0].split(size_left,
131 start_index,
133 cache_for_split)
135 if start_index == index:
136 break
137 inputs.append(_FileRange(filename, (start_index, index)))
139 if inputs:
140 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
142 return roots
145 class _FileRange(model.JsonMixin):
146 """Describe a range of a file to read.
148 FileFormatRootFactory creates instances of this class and
149 feeds them to different roots.
153 FILENAME = 'filename'
154 RANGE = 'range'
156 def __init__(self, filename, file_range=None):
157 """Init.
159 Args:
160 filename: filename in str.
161 file_range: [start_index, end_index) tuple. This only makes sense for
162 _FileFormats that support splitting within a file.
163 It specify the range to read this file.
164 None means reading the entire file. When defined, what it means
165 differ for each format. For example, if a file is of zip format,
166 index specifies the member files to read. If a file is of record
167 format, index specifies the records to read.
169 self.filename = filename
170 self.range = file_range
172 def to_json(self):
173 return {self.FILENAME: self.filename,
174 self.RANGE: self.range}
176 @classmethod
177 def from_json(cls, json):
178 return cls(json[cls.FILENAME], json[cls.RANGE])
181 class FileFormatRoot(model.JsonMixin):
182 """FileFormatRoot.
184 FileFormatRoot takes a list of FileFormats as processing units and
185 a list of _FileRanges as inputs. It provides an interface to
186 iterate through all the inputs. All inputs will be processed by each
187 processing unit in a cascaded manner before being emitted.
189 The order of the list of FileFormats matters. The last
190 FileFormat's output is returned by FileFormatRoot.
191 Each FileFormat asks FileFormatRoot for inputs, which are either outputs
192 from its previous FileFormat or, in the case of the first FileFormat,
193 outputs directly from FileFormatRoot.
195 FileFormats don't know each other. FileFormatRoot coordinates all
196 their initializations, (de)serialization, and communications.
200 _INPUTS = 'inputs'
201 _FORMATS = 'formats'
202 _FILES_STREAMS = 'files_streams'
204 def __init__(self, formats, inputs, files_streams_json=None):
205 """Init.
207 Args:
208 formats: A list of _FileFormats.
209 inputs: A list of _FileRanges.
210 init_files_streams: If to initialize files streams to default value.
212 self._inputs = inputs
213 self._formats = formats
214 for i, file_format in enumerate(self._formats):
215 stream_cls = _RootFilesStream if i == 0 else _FilesStream
216 if files_streams_json:
217 file_format._input_files_stream = stream_cls.from_json(
218 files_streams_json[i], self)
219 else:
220 file_format._input_files_stream = stream_cls(i, self)
222 def __repr__(self):
223 return str(self.to_json())
225 def __iter__(self):
226 return self
228 def to_json(self):
229 return {self._INPUTS: [_.to_json() for _ in self._inputs],
230 self._FORMATS: [_.to_json() for _ in self._formats],
231 self._FILES_STREAMS:
232 [_._input_files_stream.to_json() for _ in self._formats]}
234 @classmethod
235 def from_json(cls, json):
236 formats = [file_formats.FORMATS[_json[file_formats.FileFormat._FORMAT]].
237 from_json(_json) for _json in json[cls._FORMATS]]
239 root = cls(formats,
240 [_FileRange.from_json(_) for _ in json[cls._INPUTS]],
241 json[cls._FILES_STREAMS])
243 return root
245 def next(self):
246 """Iterate over inputs."""
247 result = self._formats[-1].next()
248 self._formats[-1]._input_files_stream.checkpoint()
249 self._formats[-1].checkpoint()
250 return result
253 class _FilesStream(object):
254 """Provide FileFormat with a stream of file-like objects as inputs.
256 Attributes:
257 current: the current file-like object to read from.
261 PREVIOUS_OFFSET = 'previous'
262 INDEX = 'index'
264 def __init__(self,
265 index,
266 file_format_root,
267 offset=0,
268 next_func=None):
269 """Init.
271 Args:
272 file_format_root: the FileFormatRoot this stream should talk to.
273 index: the index of this stream within the FileFormatRoot.
274 offset: the offset to start reading current file.
275 next_func: a function that gives back the next file from the stream.
277 self._next_file = next_func or file_format_root._formats[index-1].next
278 self._preprocess = file_format_root._formats[index].preprocess
280 self._previous_offset = offset
281 self._index = index
282 self._current = self._preprocess(self._next_file())
283 self._current.seek(offset)
285 def advance(self):
286 """Advance _current to the next file-like object.
288 _FileStream should call this after consumed the current file-like object.
290 self._previous_offset = 0
291 self._current.close()
292 self._current = self._preprocess(self._next_file())
294 @property
295 def current(self):
296 return self._current
298 def checkpoint(self):
299 self._previous_offset = self._current.tell()
301 def to_json(self):
302 return {self.PREVIOUS_OFFSET: self._previous_offset,
303 self.INDEX: self._index}
305 @classmethod
306 def from_json(cls, json, file_format_root):
307 return cls(json[cls.INDEX], file_format_root, json[cls.PREVIOUS_OFFSET])
310 class _RootFilesStream(_FilesStream):
311 """Special FilesStream for the first FileFormat"""
313 PREVIOUS_INPUT_INDEX = 'input_index'
315 def __init__(self,
316 index,
317 file_format_root,
318 offset=0,
319 input_index=0):
320 """Init.
322 Args:
323 index: the index of this stream within the FileFormatRoot.
324 file_format_root: the FileFormatRoot this stream should talk to.
325 offset: the offset to start reading current file.
326 input_index: index of the next input file to read.
328 self.__inputs = file_format_root._inputs
329 self.__input_index = input_index
330 self.__previous_input_index = input_index
331 self.__file_format_root = file_format_root
333 super(_RootFilesStream, self).__init__(index,
334 file_format_root,
335 offset,
336 self.next_file)
338 def next_file(self):
339 if self.__input_index == len(self.__inputs):
340 raise StopIteration()
341 file_input = self.__inputs[self.__input_index]
342 if file_input.range:
343 first_format = self.__file_format_root._formats[0]
344 if not first_format.can_split():
345 raise ValueError('Input range specified for a non splitable format %s'
346 % first_format.NAME)
347 first_format._range = file_input.range
348 self.__previous_input_index = self.__input_index
349 self.__input_index += 1
350 return files.open(file_input.filename, 'r', buffering=-1)
352 def to_json(self):
353 result = super(_RootFilesStream, self).to_json()
354 result[self.PREVIOUS_INPUT_INDEX] = self.__previous_input_index
355 return result
357 @classmethod
358 def from_json(cls, json, file_format_root):
359 return cls(json[cls.INDEX],
360 file_format_root,
361 json[cls.PREVIOUS_OFFSET],
362 json[cls.PREVIOUS_INPUT_INDEX])