Added Chinese translation (Babyfai Cheung).
[rox-archive.git] / formats.py
bloba7a66c99408fc9b4b0471c3aa902ca8125ff4e8c
1 if __name__ == '__main__':
2 import findrox; findrox.version(1, 99, 11)
3 import os, sys
4 from support import PipeThroughCommand, escape, Tmp
5 import rox
7 current_command = None
9 def pipe_through_command(command, src, dst):
10 global current_command
11 assert not current_command
12 try:
13 src.seek(0)
14 except:
15 pass
16 current_command = PipeThroughCommand(command, src, dst)
17 try:
18 current_command.wait()
19 finally:
20 current_command = None
22 operations = []
23 class Operation:
24 add_extension = False
26 def __init__(self, extension):
27 operations.append(self)
28 self.extension = extension
30 def can_handle(self, data):
31 return isinstance(data, FileData)
33 def save_to_stream(self, data, stream):
34 pipe_through_command(self.command, data.source, stream)
36 class Compress(Operation):
37 "Compress a stream into another stream."
38 add_extension = True
40 def __init__(self, extension, command, type):
41 Operation.__init__(self, extension)
42 self.command = command
43 self.type = type
45 def __str__(self):
46 return _('Compress as .%s') % self.extension
48 class Decompress(Operation):
49 "Decompress a stream into another stream."
50 type = 'text/plain'
52 def __init__(self, extension, command):
53 Operation.__init__(self, extension)
54 self.command = command
56 def __str__(self):
57 return _('Decompress .%s') % self.extension
59 class Extract(Operation):
60 "Extract an archive to a directory."
61 type = 'inode/directory'
63 def __init__(self, extension, command):
64 "If command has a %s then the source path is inserted, else uses stdin."
65 Operation.__init__(self, extension)
66 self.command = command
68 def __str__(self):
69 return _('Extract from a .%s') % self.extension
71 def save_to_stream(self, data, stream):
72 raise Exception(_('This operation creates a directory, so you have '
73 'to drag to a filer window on the local machine'))
75 def save_to_file(self, data, path):
76 if os.path.exists(path):
77 if not os.path.isdir(path):
78 raise Exception(_("'%s' already exists and is not a directory!") %
79 path)
80 if not os.path.exists(path):
81 os.mkdir(path)
82 os.chdir(path)
83 command = self.command
84 source = data.source
85 if command.find("'%s'") != -1:
86 command = command % escape(source.name)
87 source = None
88 try:
89 pipe_through_command(command, source, None)
90 finally:
91 try:
92 os.rmdir(path) # Will only succeed if it's empty
93 except:
94 pass
95 if os.path.exists(path):
96 self.pull_up(path)
98 def pull_up(self, path):
99 # If we created only a single subdirectory, move it up.
100 dirs = os.listdir(path)
101 if len(dirs) != 1:
102 return
103 dir = dirs[0]
104 unneeded_path = os.path.join(path, dir)
105 if not os.path.isdir(unneeded_path):
106 return
107 import random
108 tmp_path = os.path.join(path, 'tmp-' + `random.randint(0, 100000)`)
109 os.rename(unneeded_path, tmp_path)
110 for file in os.listdir(tmp_path):
111 os.rename(os.path.join(tmp_path, file), os.path.join(path, file))
112 os.rmdir(tmp_path)
114 class Archive(Operation):
115 "Create an archive from a directory."
116 add_extension = True
118 def __init__(self, extension, command, type):
119 assert command.find("'%s'") != -1
121 Operation.__init__(self, extension)
122 self.command = command
123 self.type = type
125 def __str__(self):
126 return _('Create .%s archive') % self.extension
128 def can_handle(self, data):
129 return isinstance(data, DirData)
131 def save_to_stream(self, data, stream):
132 os.chdir(os.path.dirname(data.path))
133 command = self.command % escape(os.path.basename(data.path))
134 pipe_through_command(command, None, stream)
136 tgz = Extract('tgz', "gunzip -c - | tar xf -")
137 tbz = Extract('tar.bz2', "bunzip2 -c - | tar xf -")
138 tarz = Extract('tar.Z', "uncompress -c - | tar xf -")
139 rar = Extract('rar', "rar x '%s'")
140 tar = Extract('tar', "tar xf -")
141 rpm = Extract('rpm', "rpm2cpio - | cpio -id --quiet")
142 cpio = Extract('cpio', "cpio -id --quiet")
143 deb = Extract('deb', "ar x '%s'")
144 zip = Extract('zip', "unzip -q '%s'")
145 jar = Extract('jar', "unzip -q '%s'")
147 make_tgz = Archive('tgz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
148 Archive('tar.gz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
149 Archive('tar.bz2', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
150 Archive('zip', "zip -qr - '%s'", 'application/zip'),
151 Archive('jar', "zip -qr - '%s'", 'application/x-jar')
152 Archive('tar', "tar cf - '%s'", 'application/x-tar')
154 # Note: these go afterwards so that .tar.gz matches before .gz
155 make_gz = Compress('gz', "gzip -c -", 'application/x-gzip')
156 Compress('bz2', "bzip2 -c -", 'application/x-bzip')
157 Compress('uue', "uuencode /dev/stdout", 'application/x-uuencoded')
159 gz = Decompress('gz', "gunzip -c -")
160 bz2 = Decompress('bz2', "bunzip2 -ck -")
161 uue = Decompress('uue', "uudecode -o /dev/stdout")
162 z = Decompress('Z', "uncompress -c -")
165 # Can bzip2 read bzip files?
167 aliases = {
168 'tar.gz': 'tgz',
169 'tar.bz': 'tar.bz2',
170 'tbz': 'tar.bz2',
171 'bz': 'bz2'
174 known_extensions = {}
175 for x in operations:
176 try:
177 known_extensions[x.extension] = None
178 except AttributeError:
179 pass
181 class FileData:
182 "A file on the local filesystem."
183 mode = None
184 def __init__(self, path):
185 self.path = path
187 if path == '-':
188 source = sys.stdin
189 else:
190 try:
191 source = file(path)
192 self.mode = os.stat(path).st_mode
193 except:
194 rox.report_exception()
195 sys.exit(1)
197 self.path = path
198 start = source.read(300)
199 try:
200 if source is sys.stdin:
201 raise Exception("Always copy stdin!")
202 source.seek(0)
203 self.source = source
204 except:
205 # Input is not a regular, local, seekable file, so copy it
206 # to a local temp file.
207 import shutil
208 tmp = Tmp()
209 tmp.write(start)
210 tmp.flush()
211 shutil.copyfileobj(source, tmp)
212 tmp.seek(0)
213 tmp.flush()
214 self.source = tmp
215 self.default = self.guess_format(start)
217 if path == '-':
218 name = 'Data'
219 else:
220 name = path
221 for ext in known_extensions:
222 if path.endswith('.' + ext):
223 new = path[:-len(ext)-1]
224 if len(new) < len(name):
225 name = new
226 if self.default.add_extension:
227 name += '.' + self.default.extension
228 self.default_name = name
230 def guess_format(self, data):
231 "Return a good default Operation, judging by the first 300 bytes or so."
232 l = len(data)
233 def string(offset, match):
234 return data[offset:offset + len(match)] == match
235 def short(offset, match):
236 if l > offset + 1:
237 a = data[offset]
238 b = data[offset + 1]
239 return ((a == match & 0xff) and (b == (match >> 8))) or \
240 (b == match & 0xff) and (a == (match >> 8))
241 return 0
243 # Archives
244 if string(257, 'ustar\0') or string(257, 'ustar\040\040\0'):
245 return tar
246 if short(0, 070707) or short(0, 0143561) or string(0, '070707') or \
247 string(0, '070701') or string(0, '070702'):
248 return cpio
249 if string(0, '!<arch>') or string(0, '\\<ar>') or string(0, '<ar>'):
250 if string(7, '\ndebian'):
251 return deb
252 if string(0, 'Rar!'): return rar
253 if string(0, 'PK\003\004'): return zip
254 if string(0, 'PK00'): return zip
255 if string(0, '\xed\xab\xee\xdb'): return rpm
257 # Compressed streams
258 if string(0, '\037\213'):
259 if self.path.endswith('.tar.gz') or self.path.endswith('.tgz'):
260 return tgz
261 return gz
262 if string(0, 'BZh') or string(0, 'BZ'):
263 if self.path.endswith('.tar.bz') or self.path.endswith('.tar.bz2') or \
264 self.path.endswith('.tbz') or self.path.endswith('.tbz2'):
265 return tbz
266 return bz2
267 if string(0, 'begin '):
268 return uue
269 if string(0, '\037\235'):
270 if self.path.endswith('.tar.Z'):
271 return tarz
272 return z
274 return make_gz
276 class DirData:
277 mode = None
278 def __init__(self, path):
279 self.path = path
280 self.default = make_tgz
281 self.default_name = path + '.' + self.default.extension
283 def test():
284 test_data = 'Hello\0World\n'
285 src = Tmp()
286 src.write(test_data)
287 src.flush()
288 data = FileData(src.name)
289 for comp in operations:
290 if not isinstance(comp, Compress): continue
291 dec = [o for o in operations if isinstance(o, Decompress) and
292 o.extension == comp.extension]
293 assert len(dec) == 1
294 dec = dec[0]
295 print "Test %s / %s" % (comp, dec)
296 middle = Tmp()
297 comp.save_to_stream(data, middle)
298 out = Tmp()
299 dec.save_to_stream(FileData(middle.name), out)
300 del middle
301 assert file(out.name).read() == test_data
302 print "Passed"
303 del src
305 dir = '/tmp/archive-regression-test'
306 out = dir + '.out'
307 if not os.path.exists(dir): os.mkdir(dir)
308 print >>file(dir + '/test', 'w'), test_data
309 data = DirData(dir)
311 for archive in operations:
312 if not isinstance(archive, Archive): continue
313 extract = [o for o in operations if isinstance(o, Extract) and
314 o.extension == archive.extension]
315 if not extract:
316 print "(skipping %s; no extractor)" % archive
317 continue
319 if os.path.exists(out): os.system("rm -r '%s'" % out)
321 assert len(extract) == 1
322 extract = extract[0]
323 print "Test %s / %s" % (archive, extract)
325 middle = Tmp()
326 archive.save_to_stream(data, middle)
327 extract.save_to_file(FileData(middle.name), dir + '.out')
329 assert os.listdir(dir) == os.listdir(out)
330 assert file(dir + '/test').read() == file(out + '/test').read()
331 print "Passed"
333 os.unlink(dir + '/test')
334 os.rmdir(dir)
335 if os.path.exists(out): os.system("rm -r '%s'" % out)
337 if __name__ == '__main__':
338 __builtins__._ = rox.i18n.translation(os.path.join(rox.app_dir, 'Messages'))
339 test()