3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Define file format root."""
33 from __future__
import with_statement
37 __all__
= ['FileFormatRoot',
41 import google
.appengine
.ext
.mapreduce
.file_format_parser
as parser
43 from google
.appengine
.api
.files
import file as files
44 from google
.appengine
.ext
.mapreduce
import model
45 from google
.appengine
.ext
.mapreduce
import file_formats
48 def split(filenames
, format_string
, shards
):
49 """Get a FileFormatRoot for each shard.
51 This method creates a list of FileFormatRoot and assigns each root
52 some input files. The number of roots is less than or equal to shards.
55 filenames: input filenames
56 format_string: format string from user.
57 shards: number of shards to split inputs.
60 A list of FileFormatRoot or None if all input files have zero bytes.
62 parsed_formats
= parser
.parse(format_string
)
64 sizes
= [files
.stat(filename
).st_size
for filename
in filenames
]
66 size_per_shard
= float(sum(sizes
)) / shards
67 if not size_per_shard
:
70 if parsed_formats
[0].can_split():
71 return _deep_split(filenames
, size_per_shard
, parsed_formats
)
73 return _shallow_split(filenames
, size_per_shard
, parsed_formats
, sizes
)
76 def _shallow_split(filenames
, size_per_shard
, parsed_formats
, sizes
):
77 """Split files into roots only based on top level file sizes.
79 This split does not cross file boundary.
84 for i
, size
in enumerate(sizes
):
86 inputs
.append(_FileRange(filenames
[i
], None))
87 if shard_size
> size_per_shard
:
88 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
93 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
98 def _deep_split(filenames
, size_per_shard
, parsed_formats
):
99 """Split files into roots using the first FileFormat.
101 Deep split can split within a file. It tells the first format how big
102 a split it wants and the first format will do the actually splitting
103 because only the first format knows how to operate on this particular
107 filenames: a list of input filenames.
108 size_per_shard: size per shard.
109 parsed_format: the parsed FileFormats.
112 A list of FileFormatRoot.
116 size_left
= size_per_shard
118 for filename
in filenames
:
120 with files
.open(filename
) as f
:
126 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
127 size_left
= size_per_shard
130 size_left
, index
= parsed_formats
[0].split(size_left
,
135 if start_index
== index
:
137 inputs
.append(_FileRange(filename
, (start_index
, index
)))
140 roots
.append(FileFormatRoot(copy
.deepcopy(parsed_formats
), inputs
))
145 class _FileRange(model
.JsonMixin
):
146 """Describe a range of a file to read.
148 FileFormatRootFactory creates instances of this class and
149 feeds them to different roots.
153 FILENAME
= 'filename'
156 def __init__(self
, filename
, file_range
=None):
160 filename: filename in str.
161 file_range: [start_index, end_index) tuple. This only makes sense for
162 _FileFormats that support splitting within a file.
163 It specify the range to read this file.
164 None means reading the entire file. When defined, what it means
165 differ for each format. For example, if a file is of zip format,
166 index specifies the member files to read. If a file is of record
167 format, index specifies the records to read.
169 self
.filename
= filename
170 self
.range = file_range
173 return {self
.FILENAME
: self
.filename
,
174 self
.RANGE
: self
.range}
177 def from_json(cls
, json
):
178 return cls(json
[cls
.FILENAME
], json
[cls
.RANGE
])
181 class FileFormatRoot(model
.JsonMixin
):
184 FileFormatRoot takes a list of FileFormats as processing units and
185 a list of _FileRanges as inputs. It provides an interface to
186 iterate through all the inputs. All inputs will be processed by each
187 processing unit in a cascaded manner before being emitted.
189 The order of the list of FileFormats matters. The last
190 FileFormat's output is returned by FileFormatRoot.
191 Each FileFormat asks FileFormatRoot for inputs, which are either outputs
192 from its previous FileFormat or, in the case of the first FileFormat,
193 outputs directly from FileFormatRoot.
195 FileFormats don't know each other. FileFormatRoot coordinates all
196 their initializations, (de)serialization, and communications.
202 _FILES_STREAMS
= 'files_streams'
204 def __init__(self
, formats
, inputs
, files_streams_json
=None):
208 formats: A list of _FileFormats.
209 inputs: A list of _FileRanges.
210 init_files_streams: If to initialize files streams to default value.
212 self
._inputs
= inputs
213 self
._formats
= formats
214 for i
, file_format
in enumerate(self
._formats
):
215 stream_cls
= _RootFilesStream
if i
== 0 else _FilesStream
216 if files_streams_json
:
217 file_format
._input
_files
_stream
= stream_cls
.from_json(
218 files_streams_json
[i
], self
)
220 file_format
._input
_files
_stream
= stream_cls(i
, self
)
223 return str(self
.to_json())
229 return {self
._INPUTS
: [_
.to_json() for _
in self
._inputs
],
230 self
._FORMATS
: [_
.to_json() for _
in self
._formats
],
232 [_
._input
_files
_stream
.to_json() for _
in self
._formats
]}
235 def from_json(cls
, json
):
236 formats
= [file_formats
.FORMATS
[_json
[file_formats
.FileFormat
._FORMAT
]].
237 from_json(_json
) for _json
in json
[cls
._FORMATS
]]
240 [_FileRange
.from_json(_
) for _
in json
[cls
._INPUTS
]],
241 json
[cls
._FILES
_STREAMS
])
246 """Iterate over inputs."""
247 result
= self
._formats
[-1].next()
248 self
._formats
[-1]._input
_files
_stream
.checkpoint()
249 self
._formats
[-1].checkpoint()
253 class _FilesStream(object):
254 """Provide FileFormat with a stream of file-like objects as inputs.
257 current: the current file-like object to read from.
261 PREVIOUS_OFFSET
= 'previous'
272 file_format_root: the FileFormatRoot this stream should talk to.
273 index: the index of this stream within the FileFormatRoot.
274 offset: the offset to start reading current file.
275 next_func: a function that gives back the next file from the stream.
277 self
._next
_file
= next_func
or file_format_root
._formats
[index
-1].next
278 self
._preprocess
= file_format_root
._formats
[index
].preprocess
280 self
._previous
_offset
= offset
282 self
._current
= self
._preprocess
(self
._next
_file
())
283 self
._current
.seek(offset
)
286 """Advance _current to the next file-like object.
288 _FileStream should call this after consumed the current file-like object.
290 self
._previous
_offset
= 0
291 self
._current
.close()
292 self
._current
= self
._preprocess
(self
._next
_file
())
298 def checkpoint(self
):
299 self
._previous
_offset
= self
._current
.tell()
302 return {self
.PREVIOUS_OFFSET
: self
._previous
_offset
,
303 self
.INDEX
: self
._index
}
306 def from_json(cls
, json
, file_format_root
):
307 return cls(json
[cls
.INDEX
], file_format_root
, json
[cls
.PREVIOUS_OFFSET
])
310 class _RootFilesStream(_FilesStream
):
311 """Special FilesStream for the first FileFormat"""
313 PREVIOUS_INPUT_INDEX
= 'input_index'
323 index: the index of this stream within the FileFormatRoot.
324 file_format_root: the FileFormatRoot this stream should talk to.
325 offset: the offset to start reading current file.
326 input_index: index of the next input file to read.
328 self
.__inputs
= file_format_root
._inputs
329 self
.__input
_index
= input_index
330 self
.__previous
_input
_index
= input_index
331 self
.__file
_format
_root
= file_format_root
333 super(_RootFilesStream
, self
).__init
__(index
,
339 if self
.__input
_index
== len(self
.__inputs
):
340 raise StopIteration()
341 file_input
= self
.__inputs
[self
.__input
_index
]
343 first_format
= self
.__file
_format
_root
._formats
[0]
344 if not first_format
.can_split():
345 raise ValueError('Input range specified for a non splitable format %s'
347 first_format
._range
= file_input
.range
348 self
.__previous
_input
_index
= self
.__input
_index
349 self
.__input
_index
+= 1
350 return files
.open(file_input
.filename
, 'r', buffering
=-1)
353 result
= super(_RootFilesStream
, self
).to_json()
354 result
[self
.PREVIOUS_INPUT_INDEX
] = self
.__previous
_input
_index
358 def from_json(cls
, json
, file_format_root
):
359 return cls(json
[cls
.INDEX
],
361 json
[cls
.PREVIOUS_OFFSET
],
362 json
[cls
.PREVIOUS_INPUT_INDEX
])