3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Define file format root."""
33 from __future__
import with_statement
37 __all__
= ['FileFormatRoot',
41 from google
.appengine
.api
.files
import file as files
42 from google
.appengine
.ext
.mapreduce
import file_formats
43 from google
.appengine
.ext
.mapreduce
import json_util
44 import google
.appengine
.ext
.mapreduce
.file_format_parser
as parser
47 def split(filenames
, format_string
, shards
):
48 """Get a FileFormatRoot for each shard.
50 This method creates a list of FileFormatRoot and assigns each root
51 some input files. The number of roots is less than or equal to shards.
54 filenames: input filenames
55 format_string: format string from user.
56 shards: number of shards to split inputs.
59 A list of FileFormatRoot or None if all input files have zero bytes.
61 parsed_formats
= parser
.parse(format_string
)
63 sizes
= [files
.stat(filename
).st_size
for filename
in filenames
]
65 size_per_shard
= float(sum(sizes
)) / shards
66 if not size_per_shard
:
69 if parsed_formats
[0].can_split():
70 return _deep_split(filenames
, size_per_shard
, parsed_formats
)
72 return _shallow_split(filenames
, size_per_shard
, parsed_formats
, sizes
)
75 def _shallow_split(filenames
, size_per_shard
, parsed_formats
, sizes
):
76 """Split files into roots only based on top level file sizes.
78 This split does not cross file boundary.
83 for i
, size
in enumerate(sizes
):
85 inputs
.append(_FileRange(filenames
[i
], None))
86 if shard_size
>= size_per_shard
:
87 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
92 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
97 def _deep_split(filenames
, size_per_shard
, parsed_formats
):
98 """Split files into roots using the first FileFormat.
100 Deep split can split within a file. It tells the first format how big
101 a split it wants and the first format will do the actually splitting
102 because only the first format knows how to operate on this particular
106 filenames: a list of input filenames.
107 size_per_shard: size per shard.
108 parsed_format: the parsed FileFormats.
111 A list of FileFormatRoot.
115 size_left
= size_per_shard
117 for filename
in filenames
:
119 with files
.open(filename
) as f
:
125 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
126 size_left
= size_per_shard
129 size_left
, index
= parsed_formats
[0].split(size_left
,
134 if start_index
== index
:
136 inputs
.append(_FileRange(filename
, (start_index
, index
)))
139 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
144 class _FileRange(json_util
.JsonMixin
):
145 """Describe a range of a file to read.
147 FileFormatRootFactory creates instances of this class and
148 feeds them to different roots.
152 FILENAME
= 'filename'
155 def __init__(self
, filename
, file_range
=None):
159 filename: filename in str.
160 file_range: [start_index, end_index) tuple. This only makes sense for
161 _FileFormats that support splitting within a file.
162 It specify the range to read this file.
163 None means reading the entire file. When defined, what it means
164 differ for each format. For example, if a file is of zip format,
165 index specifies the member files to read. If a file is of record
166 format, index specifies the records to read.
168 self
.filename
= filename
169 self
.range = file_range
172 return {self
.FILENAME
: self
.filename
,
173 self
.RANGE
: self
.range}
176 def from_json(cls
, json
):
177 return cls(json
[cls
.FILENAME
], json
[cls
.RANGE
])
180 class FileFormatRoot(json_util
.JsonMixin
):
183 FileFormatRoot takes a list of FileFormats as processing units and
184 a list of _FileRanges as inputs. It provides an interface to
185 iterate through all the inputs. All inputs will be processed by each
186 processing unit in a cascaded manner before being emitted.
188 The order of the list of FileFormats matters. The last
189 FileFormat's output is returned by FileFormatRoot.
190 Each FileFormat asks FileFormatRoot for inputs, which are either outputs
191 from its previous FileFormat or, in the case of the first FileFormat,
192 outputs directly from FileFormatRoot.
194 FileFormats don't know each other. FileFormatRoot coordinates all
195 their initializations, (de)serialization, and communications.
201 _FILES_STREAMS
= 'files_streams'
203 def __init__(self
, formats
, inputs
, files_streams_json
=None):
207 formats: A list of _FileFormats.
208 inputs: A list of _FileRanges.
209 init_files_streams: If to initialize files streams to default value.
211 self
._inputs
= inputs
212 self
._formats
= formats
213 for i
, file_format
in enumerate(self
._formats
):
214 stream_cls
= _RootFilesStream
if i
== 0 else _FilesStream
215 if files_streams_json
:
216 file_format
._input
_files
_stream
= stream_cls
.from_json(
217 files_streams_json
[i
], self
)
219 file_format
._input
_files
_stream
= stream_cls(i
, self
)
222 return str(self
.to_json())
228 return {self
._INPUTS
: [_
.to_json() for _
in self
._inputs
],
229 self
._FORMATS
: [_
.to_json() for _
in self
._formats
],
231 [_
._input
_files
_stream
.to_json() for _
in self
._formats
]}
234 def from_json(cls
, json
):
235 formats
= [file_formats
.FORMATS
[_json
[file_formats
.FileFormat
._FORMAT
]].
236 from_json(_json
) for _json
in json
[cls
._FORMATS
]]
239 [_FileRange
.from_json(_
) for _
in json
[cls
._INPUTS
]],
240 json
[cls
._FILES
_STREAMS
])
245 """Iterate over inputs."""
246 result
= self
._formats
[-1].next()
247 self
._formats
[-1]._input
_files
_stream
.checkpoint()
248 self
._formats
[-1].checkpoint()
252 class _FilesStream(object):
253 """Provide FileFormat with a stream of file-like objects as inputs.
256 current: the current file-like object to read from.
260 PREVIOUS_OFFSET
= 'previous'
271 file_format_root: the FileFormatRoot this stream should talk to.
272 index: the index of this stream within the FileFormatRoot.
273 offset: the offset to start reading current file.
274 next_func: a function that gives back the next file from the stream.
276 self
._next
_file
= next_func
or file_format_root
._formats
[index
-1].next
277 self
._preprocess
= file_format_root
._formats
[index
].preprocess
279 self
._previous
_offset
= offset
281 self
._current
= self
._preprocess
(self
._next
_file
())
282 self
._current
.seek(offset
)
285 """Advance _current to the next file-like object.
287 _FileStream should call this after consumed the current file-like object.
289 self
._previous
_offset
= 0
290 self
._current
.close()
291 self
._current
= self
._preprocess
(self
._next
_file
())
297 def checkpoint(self
):
298 self
._previous
_offset
= self
._current
.tell()
301 return {self
.PREVIOUS_OFFSET
: self
._previous
_offset
,
302 self
.INDEX
: self
._index
}
305 def from_json(cls
, json
, file_format_root
):
306 return cls(json
[cls
.INDEX
], file_format_root
, json
[cls
.PREVIOUS_OFFSET
])
309 class _RootFilesStream(_FilesStream
):
310 """Special FilesStream for the first FileFormat"""
312 PREVIOUS_INPUT_INDEX
= 'input_index'
322 index: the index of this stream within the FileFormatRoot.
323 file_format_root: the FileFormatRoot this stream should talk to.
324 offset: the offset to start reading current file.
325 input_index: index of the next input file to read.
327 self
.__inputs
= file_format_root
._inputs
328 self
.__input
_index
= input_index
329 self
.__previous
_input
_index
= input_index
330 self
.__file
_format
_root
= file_format_root
332 super(_RootFilesStream
, self
).__init
__(index
,
338 if self
.__input
_index
== len(self
.__inputs
):
339 raise StopIteration()
340 file_input
= self
.__inputs
[self
.__input
_index
]
342 first_format
= self
.__file
_format
_root
._formats
[0]
343 if not first_format
.can_split():
344 raise ValueError('Input range specified for a non splitable format %s'
346 first_format
._range
= file_input
.range
347 self
.__previous
_input
_index
= self
.__input
_index
348 self
.__input
_index
+= 1
349 return files
.open(file_input
.filename
, 'r', buffering
=-1)
352 result
= super(_RootFilesStream
, self
).to_json()
353 result
[self
.PREVIOUS_INPUT_INDEX
] = self
.__previous
_input
_index
357 def from_json(cls
, json
, file_format_root
):
358 return cls(json
[cls
.INDEX
],
360 json
[cls
.PREVIOUS_OFFSET
],
361 json
[cls
.PREVIOUS_INPUT_INDEX
])