Bugfix: Don't even offer to save a stream if we're just going to report an error.
[rox-archive.git] / formats.py
blobb8ee613fedb77e2743627704c311851cab34e994
1 import os, sys
2 from support import PipeThroughCommand, escape, Tmp
3 import rox
5 current_command = None
7 def pipe_through_command(command, src, dst):
8 global current_command
9 assert not current_command
10 try:
11 src.seek(0)
12 except:
13 pass
14 current_command = PipeThroughCommand(command, src, dst)
15 try:
16 current_command.wait()
17 finally:
18 current_command = None
20 operations = []
21 class Operation:
22 add_extension = False
24 def __init__(self, extension):
25 operations.append(self)
26 self.extension = extension
28 def can_handle(self, data):
29 return isinstance(data, FileData)
31 def save_to_stream(self, data, stream):
32 pipe_through_command(self.command, data.source, stream)
34 class Compress(Operation):
35 "Compress a stream into another stream."
36 add_extension = True
38 def __init__(self, extension, command, type):
39 Operation.__init__(self, extension)
40 self.command = command
41 self.type = type
43 def __str__(self):
44 return 'Compress as .%s' % self.extension
46 class Decompress(Operation):
47 "Decompress a stream into another stream."
48 type = 'text/plain'
50 def __init__(self, extension, command):
51 Operation.__init__(self, extension)
52 self.command = command
54 def __str__(self):
55 return 'Decompress .%s' % self.extension
57 class Extract(Operation):
58 "Extract an archive to a directory."
59 type = 'inode/directory'
61 def __init__(self, extension, command):
62 "If command has a %s then the source path is inserted, else uses stdin."
63 Operation.__init__(self, extension)
64 self.command = command
66 def __str__(self):
67 return 'Extract from a .%s' % self.extension
69 def save_to_stream(self, data, stream):
70 raise Exception('This operation creates a directory, so you have '
71 'to drag to a filer window on the local machine')
73 def save_to_file(self, data, path):
74 if os.path.exists(path):
75 if not os.path.isdir(path):
76 raise Exception("'%s' already exists and is not a directory!" %
77 path)
78 if not os.path.exists(path):
79 os.mkdir(path)
80 os.chdir(path)
81 command = self.command
82 source = data.source
83 if command.find("'%s'") != -1:
84 command = command % escape(source.name)
85 source = None
86 try:
87 pipe_through_command(command, source, None)
88 finally:
89 try:
90 os.rmdir(path) # Will only succeed if it's empty
91 except:
92 pass
93 if os.path.exists(path):
94 self.pull_up(path)
96 def pull_up(self, path):
97 # If we created only a single subdirectory, move it up.
98 dirs = os.listdir(path)
99 if len(dirs) != 1:
100 return
101 dir = dirs[0]
102 unneeded_path = os.path.join(path, dir)
103 if not os.path.isdir(unneeded_path):
104 return
105 import random
106 tmp_path = os.path.join(path, 'tmp-' + `random.randint(0, 100000)`)
107 os.rename(unneeded_path, tmp_path)
108 for file in os.listdir(tmp_path):
109 os.rename(os.path.join(tmp_path, file), os.path.join(path, file))
110 os.rmdir(tmp_path)
112 class Archive(Operation):
113 "Create an archive from a directory."
114 add_extension = True
116 def __init__(self, extension, command, type):
117 assert command.find("'%s'") != -1
119 Operation.__init__(self, extension)
120 self.command = command
121 self.type = type
123 def __str__(self):
124 return 'Create .%s archive' % self.extension
126 def can_handle(self, data):
127 return isinstance(data, DirData)
129 def save_to_stream(self, data, stream):
130 os.chdir(os.path.dirname(data.path))
131 command = self.command % escape(os.path.basename(data.path))
132 pipe_through_command(command, None, stream)
134 tgz = Extract('tgz', "gunzip -c - | tar xf -")
135 tbz = Extract('tar.bz2', "bunzip2 -c - | tar xf -")
136 rar = Extract('rar', "rar x -")
137 tar = Extract('tar', "tar xf -")
138 rpm = Extract('rpm', "rpm2cpio - | cpio -id --quiet")
139 cpio = Extract('cpio', "cpio -id --quiet")
140 deb = Extract('deb', "ar x '%s'")
141 zip = Extract('zip', "unzip -q '%s'")
142 jar = Extract('jar', "unzip -q '%s'")
144 make_tgz = Archive('tgz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
145 Archive('tar.gz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
146 Archive('tar.bz2', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
147 Archive('zip', "zip -qr - '%s'", 'application/zip'),
148 Archive('jar', "zip -qr - '%s'", 'application/x-jar')
149 Archive('tar', "tar cf - '%s'", 'application/x-tar')
151 # Note: these go afterwards so that .tar.gz matches before .gz
152 make_gz = Compress('gz', "gzip -c -", 'application/x-gzip')
153 Compress('bz2', "bzip2 -c -", 'application/x-bzip')
154 Compress('uue', "uuencode /dev/stdout", 'application/x-uuencoded')
156 gz = Decompress('gz', "gunzip -c -")
157 bz2 = Decompress('bz2', "bunzip2 -ck -")
158 uue = Decompress('uue', "uudecode -o /dev/stdout")
161 # Can bzip2 read bzip files?
163 aliases = {
164 'tar.gz': 'tgz',
165 'tar.bz': 'tar.bz2',
166 'bz': 'bz2'
169 known_extensions = {}
170 for x in operations:
171 try:
172 known_extensions[x.extension] = None
173 except AttributeError:
174 pass
176 class FileData:
177 "A file on the local filesystem."
178 mode = None
179 def __init__(self, path):
180 self.path = path
182 if path == '-':
183 source = sys.stdin
184 else:
185 try:
186 source = file(path)
187 self.mode = os.stat(path).st_mode
188 except:
189 rox.report_exception()
190 sys.exit(1)
192 self.path = path
193 start = source.read(300)
194 try:
195 if source is sys.stdin:
196 raise "Always copy stdin!"
197 source.seek(0)
198 self.source = source
199 except:
200 # Input is not a regular, local, seekable file, so copy it
201 # to a local temp file.
202 import shutil
203 tmp = Tmp()
204 tmp.write(start)
205 tmp.flush()
206 shutil.copyfileobj(source, tmp)
207 tmp.seek(0)
208 tmp.flush()
209 self.source = tmp
210 self.default = self.guess_format(start)
212 if path == '-':
213 name = 'Data'
214 else:
215 name = path
216 for ext in known_extensions:
217 if path.endswith('.' + ext):
218 new = path[:-len(ext)-1]
219 if len(new) < len(name):
220 name = new
221 if self.default.add_extension:
222 name += '.' + self.default.extension
223 self.default_name = name
225 def guess_format(self, data):
226 "Return a good default Operation, judging by the first 300 bytes or so."
227 l = len(data)
228 def string(offset, match):
229 return data[offset:offset + len(match)] == match
230 def short(offset, match):
231 if l > offset + 1:
232 a = data[offset]
233 b = data[offset + 1]
234 return ((a == match & 0xff) and (b == (match >> 8))) or \
235 (b == match & 0xff) and (a == (match >> 8))
236 return 0
238 # Archives
239 if string(257, 'ustar\0') or string(257, 'ustar\040\040\0'):
240 return tar
241 if short(0, 070707) or short(0, 0143561) or string(0, '070707') or \
242 string(0, '070701') or string(0, '070702'):
243 return cpio
244 if string(0, '!<arch>') or string(0, '\\<ar>') or string(0, '<ar>'):
245 if string(7, '\ndebian'):
246 return deb
247 if string(0, 'Rar!'): return rar
248 if string(0, 'PK\003\004'): return zip
249 if string(0, '\xed\xab\xee\xdb'): return rpm
251 # Compressed streams
252 if string(0, '\037\213'):
253 if self.path.endswith('.tar.gz') or self.path.endswith('.tgz'):
254 return tgz
255 return gz
256 if string(0, 'BZh') or string(0, 'BZ'):
257 if self.path.endswith('.tar.bz') or self.path.endswith('.tar.bz2') or \
258 self.path.endswith('.tbz') or self.path.endswith('.tbz2'):
259 return tbz
260 return bz2
261 if string(0, 'begin '):
262 return uue
264 return make_gz
266 class DirData:
267 mode = None
268 def __init__(self, path):
269 self.path = path
270 self.default = make_tgz
271 self.default_name = path + '.' + self.default.extension
273 def test():
274 test_data = 'Hello\0World\n'
275 src = Tmp()
276 src.write(test_data)
277 src.flush()
278 data = FileData(src.name)
279 for comp in operations:
280 if not isinstance(comp, Compress): continue
281 dec = [o for o in operations if isinstance(o, Decompress) and
282 o.extension == comp.extension]
283 assert len(dec) == 1
284 dec = dec[0]
285 print "Test %s / %s" % (comp, dec)
286 middle = Tmp()
287 comp.save_to_stream(data, middle)
288 out = Tmp()
289 dec.save_to_stream(FileData(middle.name), out)
290 del middle
291 assert file(out.name).read() == test_data
292 print "Passed"
293 del src
295 dir = '/tmp/archive-regression-test'
296 out = dir + '.out'
297 if not os.path.exists(dir): os.mkdir(dir)
298 print >>file(dir + '/test', 'w'), test_data
299 data = DirData(dir)
301 for archive in operations:
302 if not isinstance(archive, Archive): continue
303 extract = [o for o in operations if isinstance(o, Extract) and
304 o.extension == archive.extension]
305 if not extract:
306 print "(skipping %s; no extractor)" % archive
307 continue
309 if os.path.exists(out): os.system("rm -r '%s'" % out)
311 assert len(extract) == 1
312 extract = extract[0]
313 print "Test %s / %s" % (archive, extract)
315 middle = Tmp()
316 archive.save_to_stream(data, middle)
317 extract.save_to_file(FileData(middle.name), dir + '.out')
319 assert os.listdir(dir) == os.listdir(out)
320 assert file(dir + '/test').read() == file(out + '/test').read()
321 print "Passed"
323 os.unlink(dir + '/test')
324 os.rmdir(dir)
325 if os.path.exists(out): os.system("rm -r '%s'" % out)
327 if __name__ == '__main__': test()