App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / file_format_root.py
blobf37d475c90b7b5b8e5d794b79f25fc1dbf8e7423
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Define file format root."""
33 from __future__ import with_statement
37 __all__ = ['FileFormatRoot',
38 'split']
40 import copy
41 from google.appengine.api.files import file as files
42 from google.appengine.ext.mapreduce import file_formats
43 from google.appengine.ext.mapreduce import json_util
44 import google.appengine.ext.mapreduce.file_format_parser as parser
47 def split(filenames, format_string, shards):
48 """Get a FileFormatRoot for each shard.
50 This method creates a list of FileFormatRoot and assigns each root
51 some input files. The number of roots is less than or equal to shards.
53 Args:
54 filenames: input filenames
55 format_string: format string from user.
56 shards: number of shards to split inputs.
58 Returns:
59 A list of FileFormatRoot or None if all input files have zero bytes.
60 """
61 parsed_formats = parser.parse(format_string)
63 sizes = [files.stat(filename).st_size for filename in filenames]
65 size_per_shard = float(sum(sizes)) / shards
66 if not size_per_shard:
67 return
69 if parsed_formats[0].can_split():
70 return _deep_split(filenames, size_per_shard, parsed_formats)
71 else:
72 return _shallow_split(filenames, size_per_shard, parsed_formats, sizes)
75 def _shallow_split(filenames, size_per_shard, parsed_formats, sizes):
76 """Split files into roots only based on top level file sizes.
78 This split does not cross file boundary.
79 """
80 roots = []
81 inputs = []
82 shard_size = 0
83 for i, size in enumerate(sizes):
84 shard_size += size
85 inputs.append(_FileRange(filenames[i], None))
86 if shard_size >= size_per_shard:
87 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
88 inputs = []
89 shard_size = 0
91 if inputs:
92 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
94 return roots
97 def _deep_split(filenames, size_per_shard, parsed_formats):
98 """Split files into roots using the first FileFormat.
100 Deep split can split within a file. It tells the first format how big
101 a split it wants and the first format will do the actually splitting
102 because only the first format knows how to operate on this particular
103 format.
105 Args:
106 filenames: a list of input filenames.
107 size_per_shard: size per shard.
108 parsed_format: the parsed FileFormats.
110 Returns:
111 A list of FileFormatRoot.
113 roots = []
114 inputs = []
115 size_left = size_per_shard
117 for filename in filenames:
118 index = 0
119 with files.open(filename) as f:
120 cache_for_split = {}
122 while True:
123 if size_left <= 0:
125 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
126 size_left = size_per_shard
127 inputs = []
128 start_index = index
129 size_left, index = parsed_formats[0].split(size_left,
130 start_index,
132 cache_for_split)
134 if start_index == index:
135 break
136 inputs.append(_FileRange(filename, (start_index, index)))
138 if inputs:
139 roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
141 return roots
144 class _FileRange(json_util.JsonMixin):
145 """Describe a range of a file to read.
147 FileFormatRootFactory creates instances of this class and
148 feeds them to different roots.
152 FILENAME = 'filename'
153 RANGE = 'range'
155 def __init__(self, filename, file_range=None):
156 """Init.
158 Args:
159 filename: filename in str.
160 file_range: [start_index, end_index) tuple. This only makes sense for
161 _FileFormats that support splitting within a file.
162 It specify the range to read this file.
163 None means reading the entire file. When defined, what it means
164 differ for each format. For example, if a file is of zip format,
165 index specifies the member files to read. If a file is of record
166 format, index specifies the records to read.
168 self.filename = filename
169 self.range = file_range
171 def to_json(self):
172 return {self.FILENAME: self.filename,
173 self.RANGE: self.range}
175 @classmethod
176 def from_json(cls, json):
177 return cls(json[cls.FILENAME], json[cls.RANGE])
180 class FileFormatRoot(json_util.JsonMixin):
181 """FileFormatRoot.
183 FileFormatRoot takes a list of FileFormats as processing units and
184 a list of _FileRanges as inputs. It provides an interface to
185 iterate through all the inputs. All inputs will be processed by each
186 processing unit in a cascaded manner before being emitted.
188 The order of the list of FileFormats matters. The last
189 FileFormat's output is returned by FileFormatRoot.
190 Each FileFormat asks FileFormatRoot for inputs, which are either outputs
191 from its previous FileFormat or, in the case of the first FileFormat,
192 outputs directly from FileFormatRoot.
194 FileFormats don't know each other. FileFormatRoot coordinates all
195 their initializations, (de)serialization, and communications.
199 _INPUTS = 'inputs'
200 _FORMATS = 'formats'
201 _FILES_STREAMS = 'files_streams'
203 def __init__(self, formats, inputs, files_streams_json=None):
204 """Init.
206 Args:
207 formats: A list of _FileFormats.
208 inputs: A list of _FileRanges.
209 init_files_streams: If to initialize files streams to default value.
211 self._inputs = inputs
212 self._formats = formats
213 for i, file_format in enumerate(self._formats):
214 stream_cls = _RootFilesStream if i == 0 else _FilesStream
215 if files_streams_json:
216 file_format._input_files_stream = stream_cls.from_json(
217 files_streams_json[i], self)
218 else:
219 file_format._input_files_stream = stream_cls(i, self)
221 def __repr__(self):
222 return str(self.to_json())
224 def __iter__(self):
225 return self
227 def to_json(self):
228 return {self._INPUTS: [_.to_json() for _ in self._inputs],
229 self._FORMATS: [_.to_json() for _ in self._formats],
230 self._FILES_STREAMS:
231 [_._input_files_stream.to_json() for _ in self._formats]}
233 @classmethod
234 def from_json(cls, json):
235 formats = [file_formats.FORMATS[_json[file_formats.FileFormat._FORMAT]].
236 from_json(_json) for _json in json[cls._FORMATS]]
238 root = cls(formats,
239 [_FileRange.from_json(_) for _ in json[cls._INPUTS]],
240 json[cls._FILES_STREAMS])
242 return root
244 def next(self):
245 """Iterate over inputs."""
246 result = self._formats[-1].next()
247 self._formats[-1]._input_files_stream.checkpoint()
248 self._formats[-1].checkpoint()
249 return result
252 class _FilesStream(object):
253 """Provide FileFormat with a stream of file-like objects as inputs.
255 Attributes:
256 current: the current file-like object to read from.
260 PREVIOUS_OFFSET = 'previous'
261 INDEX = 'index'
263 def __init__(self,
264 index,
265 file_format_root,
266 offset=0,
267 next_func=None):
268 """Init.
270 Args:
271 file_format_root: the FileFormatRoot this stream should talk to.
272 index: the index of this stream within the FileFormatRoot.
273 offset: the offset to start reading current file.
274 next_func: a function that gives back the next file from the stream.
276 self._next_file = next_func or file_format_root._formats[index-1].next
277 self._preprocess = file_format_root._formats[index].preprocess
279 self._previous_offset = offset
280 self._index = index
281 self._current = self._preprocess(self._next_file())
282 self._current.seek(offset)
284 def advance(self):
285 """Advance _current to the next file-like object.
287 _FileStream should call this after consumed the current file-like object.
289 self._previous_offset = 0
290 self._current.close()
291 self._current = self._preprocess(self._next_file())
293 @property
294 def current(self):
295 return self._current
297 def checkpoint(self):
298 self._previous_offset = self._current.tell()
300 def to_json(self):
301 return {self.PREVIOUS_OFFSET: self._previous_offset,
302 self.INDEX: self._index}
304 @classmethod
305 def from_json(cls, json, file_format_root):
306 return cls(json[cls.INDEX], file_format_root, json[cls.PREVIOUS_OFFSET])
309 class _RootFilesStream(_FilesStream):
310 """Special FilesStream for the first FileFormat"""
312 PREVIOUS_INPUT_INDEX = 'input_index'
314 def __init__(self,
315 index,
316 file_format_root,
317 offset=0,
318 input_index=0):
319 """Init.
321 Args:
322 index: the index of this stream within the FileFormatRoot.
323 file_format_root: the FileFormatRoot this stream should talk to.
324 offset: the offset to start reading current file.
325 input_index: index of the next input file to read.
327 self.__inputs = file_format_root._inputs
328 self.__input_index = input_index
329 self.__previous_input_index = input_index
330 self.__file_format_root = file_format_root
332 super(_RootFilesStream, self).__init__(index,
333 file_format_root,
334 offset,
335 self.next_file)
337 def next_file(self):
338 if self.__input_index == len(self.__inputs):
339 raise StopIteration()
340 file_input = self.__inputs[self.__input_index]
341 if file_input.range:
342 first_format = self.__file_format_root._formats[0]
343 if not first_format.can_split():
344 raise ValueError('Input range specified for a non splitable format %s'
345 % first_format.NAME)
346 first_format._range = file_input.range
347 self.__previous_input_index = self.__input_index
348 self.__input_index += 1
349 return files.open(file_input.filename, 'r', buffering=-1)
351 def to_json(self):
352 result = super(_RootFilesStream, self).to_json()
353 result[self.PREVIOUS_INPUT_INDEX] = self.__previous_input_index
354 return result
356 @classmethod
357 def from_json(cls, json, file_format_root):
358 return cls(json[cls.INDEX],
359 file_format_root,
360 json[cls.PREVIOUS_OFFSET],
361 json[cls.PREVIOUS_INPUT_INDEX])