Moved unit tests to a separate directory (Thomas Leonard).
[rox-archive.git] / formats.py
blob1fbf7e6de1de8d08875ced6ccefa0ec5fc80e005
1 if __name__ == '__main__':
2 import findrox; findrox.version(1, 99, 11)
3 import os, sys
4 from support import shell_escape, Tmp
5 import rox
6 from rox.processes import PipeThroughCommand
8 current_command = None
10 def pipe_through_command(command, src, dst):
11 global current_command
12 assert not current_command
13 try:
14 src.seek(0)
15 except:
16 pass
17 current_command = PipeThroughCommand(command, src, dst)
18 try:
19 current_command.wait()
20 finally:
21 current_command = None
23 operations = []
24 class Operation:
25 add_extension = False
27 def __init__(self, extension):
28 operations.append(self)
29 self.extension = extension
31 def can_handle(self, data):
32 return isinstance(data, FileData)
34 def save_to_stream(self, data, stream):
35 pipe_through_command(self.command, data.source, stream)
37 class Compress(Operation):
38 "Compress a stream into another stream."
39 add_extension = True
41 def __init__(self, extension, command, type):
42 Operation.__init__(self, extension)
43 self.command = command
44 self.type = type
46 def __str__(self):
47 return _('Compress as .%s') % self.extension
49 class Decompress(Operation):
50 "Decompress a stream into another stream."
51 type = 'text/plain'
53 def __init__(self, extension, command):
54 Operation.__init__(self, extension)
55 self.command = command
57 def __str__(self):
58 return _('Decompress .%s') % self.extension
60 class Extract(Operation):
61 "Extract an archive to a directory."
62 type = 'inode/directory'
64 def __init__(self, extension, command):
65 "If command has a %s then the source path is inserted, else uses stdin."
66 Operation.__init__(self, extension)
67 self.command = command
69 def __str__(self):
70 return _('Extract from a .%s') % self.extension
72 def save_to_stream(self, data, stream):
73 raise Exception(_('This operation creates a directory, so you have '
74 'to drag to a filer window on the local machine'))
76 def save_to_file(self, data, path):
77 if os.path.exists(path):
78 if not os.path.isdir(path):
79 raise Exception(_("'%s' already exists and is not a directory!") %
80 path)
81 if not os.path.exists(path):
82 os.mkdir(path)
83 os.chdir(path)
84 command = self.command
85 source = data.source
86 if command.find("'%s'") != -1:
87 command = command % shell_escape(source.name)
88 source = None
89 try:
90 pipe_through_command(command, source, None)
91 finally:
92 try:
93 os.rmdir(path) # Will only succeed if it's empty
94 except:
95 pass
96 if os.path.exists(path):
97 self.pull_up(path)
99 def pull_up(self, path):
100 # If we created only a single subdirectory, move it up.
101 dirs = os.listdir(path)
102 if len(dirs) != 1:
103 return
104 dir = dirs[0]
105 unneeded_path = os.path.join(path, dir)
106 if not os.path.isdir(unneeded_path):
107 return
108 import random
109 tmp_path = os.path.join(path, 'tmp-' + `random.randint(0, 100000)`)
110 os.rename(unneeded_path, tmp_path)
111 for file in os.listdir(tmp_path):
112 os.rename(os.path.join(tmp_path, file), os.path.join(path, file))
113 os.rmdir(tmp_path)
115 class Archive(Operation):
116 "Create an archive from a directory."
117 add_extension = True
119 def __init__(self, extension, command, type):
120 assert command.find("'%s'") != -1
122 Operation.__init__(self, extension)
123 self.command = command
124 self.type = type
126 def __str__(self):
127 return _('Create .%s archive') % self.extension
129 def can_handle(self, data):
130 return isinstance(data, DirData)
132 def save_to_stream(self, data, stream):
133 os.chdir(os.path.dirname(data.path))
134 command = self.command % shell_escape(os.path.basename(data.path))
135 pipe_through_command(command, None, stream)
137 tgz = Extract('tgz', "gunzip -c - | tar xf -")
138 tbz = Extract('tar.bz2', "bunzip2 -c - | tar xf -")
139 tarz = Extract('tar.Z', "uncompress -c - | tar xf -")
140 rar = Extract('rar', "rar x '%s'")
141 tar = Extract('tar', "tar xf -")
142 rpm = Extract('rpm', "rpm2cpio - | cpio -id --quiet")
143 cpio = Extract('cpio', "cpio -id --quiet")
144 deb = Extract('deb', "ar x '%s'")
145 zip = Extract('zip', "unzip -q '%s'")
146 jar = Extract('jar', "unzip -q '%s'")
148 make_tgz = Archive('tgz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
149 Archive('tar.gz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
150 Archive('tar.bz2', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
151 Archive('zip', "zip -qr - '%s'", 'application/zip'),
152 Archive('jar', "zip -qr - '%s'", 'application/x-jar')
153 Archive('tar', "tar cf - '%s'", 'application/x-tar')
155 # Note: these go afterwards so that .tar.gz matches before .gz
156 make_gz = Compress('gz', "gzip -c -", 'application/x-gzip')
157 Compress('bz2', "bzip2 -c -", 'application/x-bzip')
158 Compress('uue', "uuencode /dev/stdout", 'application/x-uuencoded')
160 gz = Decompress('gz', "gunzip -c -")
161 bz2 = Decompress('bz2', "bunzip2 -ck -")
162 uue = Decompress('uue', "uudecode -o /dev/stdout")
163 z = Decompress('Z', "uncompress -c -")
166 # Can bzip2 read bzip files?
168 aliases = {
169 'tar.gz': 'tgz',
170 'tar.bz': 'tar.bz2',
171 'tbz': 'tar.bz2',
172 'bz': 'bz2'
175 known_extensions = {}
176 for x in operations:
177 try:
178 known_extensions[x.extension] = None
179 except AttributeError:
180 pass
182 class FileData:
183 "A file on the local filesystem."
184 mode = None
185 def __init__(self, path):
186 self.path = path
188 if path == '-':
189 source = sys.stdin
190 else:
191 try:
192 source = file(path)
193 self.mode = os.stat(path).st_mode
194 except:
195 rox.report_exception()
196 sys.exit(1)
198 self.path = path
199 start = source.read(300)
200 try:
201 if source is sys.stdin:
202 raise Exception("Always copy stdin!")
203 source.seek(0)
204 self.source = source
205 except:
206 # Input is not a regular, local, seekable file, so copy it
207 # to a local temp file.
208 import shutil
209 tmp = Tmp()
210 tmp.write(start)
211 tmp.flush()
212 shutil.copyfileobj(source, tmp)
213 tmp.seek(0)
214 tmp.flush()
215 self.source = tmp
216 self.default = self.guess_format(start)
218 if path == '-':
219 name = 'Data'
220 else:
221 name = path
222 for ext in known_extensions:
223 if path.endswith('.' + ext):
224 new = path[:-len(ext)-1]
225 if len(new) < len(name):
226 name = new
227 if self.default.add_extension:
228 name += '.' + self.default.extension
230 if name == path:
231 # Default name is same as input. Change it somehow...
232 if '.' in os.path.basename(name):
233 name = name[:name.rindex('.')]
234 else:
235 name += '.unpacked'
237 self.default_name = name
239 def guess_format(self, data):
240 "Return a good default Operation, judging by the first 300 bytes or so."
241 l = len(data)
242 def string(offset, match):
243 return data[offset:offset + len(match)] == match
244 def short(offset, match):
245 if l > offset + 1:
246 a = data[offset]
247 b = data[offset + 1]
248 return ((a == match & 0xff) and (b == (match >> 8))) or \
249 (b == match & 0xff) and (a == (match >> 8))
250 return 0
252 # Archives
253 if string(257, 'ustar\0') or string(257, 'ustar\040\040\0'):
254 return tar
255 if short(0, 070707) or short(0, 0143561) or string(0, '070707') or \
256 string(0, '070701') or string(0, '070702'):
257 return cpio
258 if string(0, '!<arch>') or string(0, '\\<ar>') or string(0, '<ar>'):
259 if string(7, '\ndebian'):
260 return deb
261 if string(0, 'Rar!'): return rar
262 if string(0, 'PK\003\004'): return zip
263 if string(0, 'PK00'): return zip
264 if string(0, '\xed\xab\xee\xdb'): return rpm
266 # Compressed streams
267 if string(0, '\037\213'):
268 if self.path.endswith('.tar.gz') or self.path.endswith('.tgz'):
269 return tgz
270 return gz
271 if string(0, 'BZh') or string(0, 'BZ'):
272 if self.path.endswith('.tar.bz') or self.path.endswith('.tar.bz2') or \
273 self.path.endswith('.tbz') or self.path.endswith('.tbz2'):
274 return tbz
275 return bz2
276 if string(0, 'begin '):
277 return uue
278 if string(0, '\037\235'):
279 if self.path.endswith('.tar.Z'):
280 return tarz
281 return z
283 return make_gz
285 class DirData:
286 mode = None
287 def __init__(self, path):
288 self.path = path
289 self.default = make_tgz
290 self.default_name = path + '.' + self.default.extension