Bug fixes and code tidying.
[rox-archive.git] / formats.py
blobaaedb89c040dbfd10814edc7f7e996e5068b4b94
1 import os, sys
2 from support import pipe_through_command, escape, Tmp
3 import rox
5 operations = []
6 class Operation:
7 add_extension = 0
9 def __init__(self, extension):
10 operations.append(self)
11 self.extension = extension
13 def can_handle(self, data):
14 return isinstance(data, FileData)
16 def save_to_stream(self, data, stream):
17 pipe_through_command(self.command, data.source, stream)
19 class Compress(Operation):
20 "Compress a stream into another stream."
21 add_extension = 1
23 def __init__(self, extension, command, type):
24 Operation.__init__(self, extension)
25 self.command = command
26 self.type = type
28 def __str__(self):
29 return 'Compress as .%s' % self.extension
31 class Decompress(Operation):
32 "Decompress a stream into another stream."
33 type = 'text/plain'
35 def __init__(self, extension, command):
36 Operation.__init__(self, extension)
37 self.command = command
39 def __str__(self):
40 return 'Decompress .%s' % self.extension
42 class Extract(Operation):
43 "Extract an archive to a directory."
44 type = 'inode/directory'
46 def __init__(self, extension, command):
47 "If command has a %s then the source path is inserted, else uses stdin."
48 Operation.__init__(self, extension)
49 self.command = command
51 def __str__(self):
52 return 'Extract from a .%s' % self.extension
54 def save_to_stream(self, data, stream):
55 raise Exception('This operation creates a directory, so you have '
56 'to drag to a filer window on the local machine')
58 def save_to_file(self, data, path):
59 if os.path.exists(path):
60 if not os.path.isdir(path):
61 raise Exception("'%s' already exists and is not a directory!" %
62 path)
63 if not rox.confirm('Directory already exists; extract into it?',
64 g.STOCK_YES):
65 raise Exception('Extraction aborted... try somewhere else')
66 if not os.path.exists(path):
67 os.mkdir(path)
68 os.chdir(path)
69 command = self.command
70 source = data.source
71 if command.find("'%s'") != -1:
72 # TODO: Handle path being '-'
73 command = command % escape(data.path)
74 source = None
75 pipe_through_command(command, source, None)
77 class Archive(Operation):
78 "Create an archive from a directory."
79 add_extension = 1
81 def __init__(self, extension, command, type):
82 assert command.find("'%s'") != -1
84 Operation.__init__(self, extension)
85 self.command = command
86 self.type = type
88 def __str__(self):
89 return 'Create .%s archive' % self.extension
91 def can_handle(self, data):
92 return isinstance(data, DirData)
94 def save_to_stream(self, data, stream):
95 os.chdir(os.path.dirname(data.path))
96 command = self.command % escape(os.path.basename(data.path))
97 pipe_through_command(command, None, stream)
99 tgz = Extract('tgz', "gunzip -c - | tar xf -")
100 tbz = Extract('tar.bz2', "bunzip2 -c - | tar xf -")
101 jar = Extract('jar', "unzip -q -")
102 rar = Extract('rar', "rar x -")
103 tar = Extract('tar', "tar xf -")
104 rpm = Extract('rpm', "rpm2cpio - | cpio -id --quiet")
105 cpio = Extract('cpio', "cpio -id --quiet")
106 deb = Extract('deb', "ar x '%s'")
107 zip = Extract('zip', "unzip -q '%s'")
109 make_tgz = Archive('tgz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
110 Archive('tar.gz', "tar cf - '%s' | gzip", 'application/x-compressed-tar')
111 Archive('tar.bz', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
112 Archive('tar.bz2', "tar cf - '%s' | bzip2", 'application/x-bzip-compressed-tar')
113 Archive('zip', "zip -qr - '%s'", 'application/zip'),
114 Archive('jar', "zip -qr - '%s'", 'application/x-jar')
115 Archive('tar', "tar cf - '%s'", 'application/x-tar')
117 # Note: these go afterwards so that .tar.gz matches before .gz
118 make_gz = Compress('gz', "gzip -c -", 'application/x-gzip')
119 Compress('bz2', "bzip2 -c -", 'application/x-bzip')
121 gz = Decompress('gz', "gunzip -c -")
122 bz2 = Decompress('bz2', "bunzip2 -ck -")
125 # Can bzip2 read bzip files?
127 aliases = {
128 'tar.gz': 'tgz',
129 'tar.bz': 'tar.bz2',
130 'bz': 'bz2'
133 known_extensions = {}
134 for x in operations:
135 try:
136 known_extensions[x.extension] = None
137 except AttributeError:
138 pass
140 class FileData:
141 "A file on the local filesystem."
142 def __init__(self, path):
143 self.path = path
145 if path == '-':
146 source = sys.stdin
147 else:
148 try:
149 source = file(path)
150 except:
151 rox.report_exception()
152 sys.exit(1)
154 self.path = path
155 start = source.read(300)
156 try:
157 source.seek(0)
158 self.source = source
159 except:
160 # Input is not a regular, local, seekable file, so copy it
161 # to a local temp file.
162 import shutil
163 tmp = Tmp()
164 tmp.write(start)
165 tmp.flush()
166 shutil.copyfileobj(source, tmp)
167 self.source = tmp
168 self.source.seek(0)
169 self.default = self.guess_format(start)
171 if path == '-':
172 name = 'Data'
173 else:
174 name = path
175 for ext in known_extensions:
176 if path.endswith('.' + ext):
177 new = path[:-len(ext)-1]
178 if len(new) < len(name):
179 name = new
180 if self.default.add_extension:
181 name += '.' + self.default.extension
182 self.default_name = name
184 def guess_format(self, data):
185 "Return a good default Operation, judging by the first 300 bytes or so."
186 l = len(data)
187 def string(offset, match):
188 return data[offset:offset + len(match)] == match
189 def short(offset, match):
190 if l > offset + 1:
191 a = data[offset]
192 b = data[offset + 1]
193 return ((a == match & 0xff) and (b == (match >> 8))) or \
194 (b == match & 0xff) and (a == (match >> 8))
195 return 0
197 # Archives
198 if string(257, 'ustar\0') or string(257, 'ustar\040\040\0'):
199 return tar
200 if short(0, 070707) or short(0, 0143561) or string(0, '070707') or \
201 string(0, '070701') or string(0, '070702'):
202 return cpio
203 if string(0, '!<arch>') or string(0, '\\<ar>') or string(0, '<ar>'):
204 if string(7, '\ndebian'):
205 return deb
206 if string(0, 'Rar!'): return rar
207 if string(0, 'PK\003\004'): return zip
209 # Compressed streams
210 if string(0, '\037\213'):
211 if self.path.endswith('.tar.gz') or self.path.endswith('.tgz'):
212 return tgz
213 return gz
214 if string(0, 'BZh') or string(0, 'BZ'):
215 if self.path.endswith('.tar.bz') or self.path.endswith('.tar.bz2') or \
216 self.path.endswith('.tbz') or self.path.endswith('.tbz2'):
217 return tbz
218 return bz2
220 return make_gz
222 class DirData:
223 def __init__(self, path):
224 self.path = path
225 self.default = make_tgz
226 self.default_name = path + '.' + self.default.extension