Always pass a pathname to 'rar'; it can't handle stdin.
[rox-archive.git] / formats.py
blob1fc5c9622e1791ef608a4080267f4bbd8379be19
1 if __name__ == '__main__':
2 import findrox; findrox.version(1, 99, 11)
3 import os, sys
4 from support import PipeThroughCommand, escape, Tmp
5 import rox
7 current_command = None
9 def pipe_through_command(command, src, dst):
10 global current_command
11 assert not current_command
12 try:
13 src.seek(0)
14 except:
15 pass
16 current_command = PipeThroughCommand(command, src, dst)
17 try:
18 current_command.wait()
19 finally:
20 current_command = None
22 operations = []
23 class Operation:
24 add_extension = False
26 def __init__(self, extension):
27 operations.append(self)
28 self.extension = extension
30 def can_handle(self, data):
31 return isinstance(data, FileData)
33 def save_to_stream(self, data, stream):
34 pipe_through_command(self.command, data.source, stream)
36 class Compress(Operation):
37 "Compress a stream into another stream."
38 add_extension = True
40 def __init__(self, extension, command, type):
41 Operation.__init__(self, extension)
42 self.command = command
43 self.type = type
45 def __str__(self):
46 return 'Compress as .%s' % self.extension
48 class Decompress(Operation):
49 "Decompress a stream into another stream."
50 type = 'text/plain'
52 def __init__(self, extension, command):
53 Operation.__init__(self, extension)
54 self.command = command
56 def __str__(self):
57 return 'Decompress .%s' % self.extension
59 class Extract(Operation):
60 "Extract an archive to a directory."
61 type = 'inode/directory'
63 def __init__(self, extension, command):
64 "If command has a %s then the source path is inserted, else uses stdin."
65 Operation.__init__(self, extension)
66 self.command = command
68 def __str__(self):
69 return 'Extract from a .%s' % self.extension
71 def save_to_stream(self, data, stream):
72 raise Exception('This operation creates a directory, so you have '
73 'to drag to a filer window on the local machine')
75 def save_to_file(self, data, path):
76 if os.path.exists(path):
77 if not os.path.isdir(path):
78 raise Exception("'%s' already exists and is not a directory!" %
79 path)
80 if not os.path.exists(path):
81 os.mkdir(path)
82 os.chdir(path)
83 command = self.command
84 source = data.source
85 if command.find("'%s'") != -1:
86 command = command % escape(source.name)
87 source = None
88 try:
89 pipe_through_command(command, source, None)
90 finally:
91 try:
92 os.rmdir(path) # Will only succeed if it's empty
93 except:
94 pass
95 if os.path.exists(path):
96 self.pull_up(path)
98 def pull_up(self, path):
99 # If we created only a single subdirectory, move it up.
100 dirs = os.listdir(path)
101 if len(dirs) != 1:
102 return
103 dir = dirs[0]
104 unneeded_path = os.path.join(path, dir)
105 if not os.path.isdir(unneeded_path):
106 return
107 import random
108 tmp_path = os.path.join(path, 'tmp-' + `random.randint(0, 100000)`)
109 os.rename(unneeded_path, tmp_path)
110 for file in os.listdir(tmp_path):
111 os.rename(os.path.join(tmp_path, file), os.path.join(path, file))
112 os.rmdir(tmp_path)
114 class Archive(Operation):
115 "Create an archive from a directory."
116 add_extension = True
118 def __init__(self, extension, command, type):
119 assert command.find("'%s'") != -1
121 Operation.__init__(self, extension)
122 self.command = command
123 self.type = type
125 def __str__(self):
126 return 'Create .%s archive' % self.extension
128 def can_handle(self, data):
129 return isinstance(data, DirData)
131 def save_to_stream(self, data, stream):
132 os.chdir(os.path.dirname(data.path))
133 command = self.command % escape(os.path.basename(data.path))
134 pipe_through_command(command, None, stream)
136 tgz = Extract('tgz', "gunzip -c - | tar xf -")
137 tbz = Extract('tar.bz2', "bunzip2 -c - | tar xf -")
138 rar = Extract('rar', "rar x '%s'")
139 tar = Extract('tar', "tar xf -")
140 rpm = Extract('rpm', "rpm2cpio - | cpio -id --quiet")
141 cpio = Extract('cpio', "cpio -id --quiet")
142 deb = Extract('deb', "ar x '%s'")
143 zip = Extract('zip', "unzip -q '%s'")
144 jar = Extract('jar', "unzip -q '%s'")
146 make_tgz = Archive('tgz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
147 Archive('tar.gz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
148 Archive('tar.bz2', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
149 Archive('zip', "zip -qr - '%s'", 'application/zip'),
150 Archive('jar', "zip -qr - '%s'", 'application/x-jar')
151 Archive('tar', "tar cf - '%s'", 'application/x-tar')
153 # Note: these go afterwards so that .tar.gz matches before .gz
154 make_gz = Compress('gz', "gzip -c -", 'application/x-gzip')
155 Compress('bz2', "bzip2 -c -", 'application/x-bzip')
156 Compress('uue', "uuencode /dev/stdout", 'application/x-uuencoded')
158 gz = Decompress('gz', "gunzip -c -")
159 bz2 = Decompress('bz2', "bunzip2 -ck -")
160 uue = Decompress('uue', "uudecode -o /dev/stdout")
163 # Can bzip2 read bzip files?
165 aliases = {
166 'tar.gz': 'tgz',
167 'tar.bz': 'tar.bz2',
168 'bz': 'bz2'
171 known_extensions = {}
172 for x in operations:
173 try:
174 known_extensions[x.extension] = None
175 except AttributeError:
176 pass
178 class FileData:
179 "A file on the local filesystem."
180 mode = None
181 def __init__(self, path):
182 self.path = path
184 if path == '-':
185 source = sys.stdin
186 else:
187 try:
188 source = file(path)
189 self.mode = os.stat(path).st_mode
190 except:
191 rox.report_exception()
192 sys.exit(1)
194 self.path = path
195 start = source.read(300)
196 try:
197 if source is sys.stdin:
198 raise Exception("Always copy stdin!")
199 source.seek(0)
200 self.source = source
201 except:
202 # Input is not a regular, local, seekable file, so copy it
203 # to a local temp file.
204 import shutil
205 tmp = Tmp()
206 tmp.write(start)
207 tmp.flush()
208 shutil.copyfileobj(source, tmp)
209 tmp.seek(0)
210 tmp.flush()
211 self.source = tmp
212 self.default = self.guess_format(start)
214 if path == '-':
215 name = 'Data'
216 else:
217 name = path
218 for ext in known_extensions:
219 if path.endswith('.' + ext):
220 new = path[:-len(ext)-1]
221 if len(new) < len(name):
222 name = new
223 if self.default.add_extension:
224 name += '.' + self.default.extension
225 self.default_name = name
227 def guess_format(self, data):
228 "Return a good default Operation, judging by the first 300 bytes or so."
229 l = len(data)
230 def string(offset, match):
231 return data[offset:offset + len(match)] == match
232 def short(offset, match):
233 if l > offset + 1:
234 a = data[offset]
235 b = data[offset + 1]
236 return ((a == match & 0xff) and (b == (match >> 8))) or \
237 (b == match & 0xff) and (a == (match >> 8))
238 return 0
240 # Archives
241 if string(257, 'ustar\0') or string(257, 'ustar\040\040\0'):
242 return tar
243 if short(0, 070707) or short(0, 0143561) or string(0, '070707') or \
244 string(0, '070701') or string(0, '070702'):
245 return cpio
246 if string(0, '!<arch>') or string(0, '\\<ar>') or string(0, '<ar>'):
247 if string(7, '\ndebian'):
248 return deb
249 if string(0, 'Rar!'): return rar
250 if string(0, 'PK\003\004'): return zip
251 if string(0, '\xed\xab\xee\xdb'): return rpm
253 # Compressed streams
254 if string(0, '\037\213'):
255 if self.path.endswith('.tar.gz') or self.path.endswith('.tgz'):
256 return tgz
257 return gz
258 if string(0, 'BZh') or string(0, 'BZ'):
259 if self.path.endswith('.tar.bz') or self.path.endswith('.tar.bz2') or \
260 self.path.endswith('.tbz') or self.path.endswith('.tbz2'):
261 return tbz
262 return bz2
263 if string(0, 'begin '):
264 return uue
266 return make_gz
268 class DirData:
269 mode = None
270 def __init__(self, path):
271 self.path = path
272 self.default = make_tgz
273 self.default_name = path + '.' + self.default.extension
275 def test():
276 test_data = 'Hello\0World\n'
277 src = Tmp()
278 src.write(test_data)
279 src.flush()
280 data = FileData(src.name)
281 for comp in operations:
282 if not isinstance(comp, Compress): continue
283 dec = [o for o in operations if isinstance(o, Decompress) and
284 o.extension == comp.extension]
285 assert len(dec) == 1
286 dec = dec[0]
287 print "Test %s / %s" % (comp, dec)
288 middle = Tmp()
289 comp.save_to_stream(data, middle)
290 out = Tmp()
291 dec.save_to_stream(FileData(middle.name), out)
292 del middle
293 assert file(out.name).read() == test_data
294 print "Passed"
295 del src
297 dir = '/tmp/archive-regression-test'
298 out = dir + '.out'
299 if not os.path.exists(dir): os.mkdir(dir)
300 print >>file(dir + '/test', 'w'), test_data
301 data = DirData(dir)
303 for archive in operations:
304 if not isinstance(archive, Archive): continue
305 extract = [o for o in operations if isinstance(o, Extract) and
306 o.extension == archive.extension]
307 if not extract:
308 print "(skipping %s; no extractor)" % archive
309 continue
311 if os.path.exists(out): os.system("rm -r '%s'" % out)
313 assert len(extract) == 1
314 extract = extract[0]
315 print "Test %s / %s" % (archive, extract)
317 middle = Tmp()
318 archive.save_to_stream(data, middle)
319 extract.save_to_file(FileData(middle.name), dir + '.out')
321 assert os.listdir(dir) == os.listdir(out)
322 assert file(dir + '/test').read() == file(out + '/test').read()
323 print "Passed"
325 os.unlink(dir + '/test')
326 os.rmdir(dir)
327 if os.path.exists(out): os.system("rm -r '%s'" % out)
329 if __name__ == '__main__': test()