The regexes for some music playlist types didn't necessarily stop
[pyTivo/wmcbrine.git] / mutagen / _util.py
blob819551bdaf303fbf901b986a378723cc198aaf9a
1 # Copyright 2006 Joe Wreschnig
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4218 2007-12-02 06:11:20Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
13 """
15 import struct
17 from fnmatch import fnmatchcase
19 class DictMixin(object):
20 """Implement the dict API using keys() and __*item__ methods.
22 Similar to UserDict.DictMixin, this takes a class that defines
23 __getitem__, __setitem__, __delitem__, and keys(), and turns it
24 into a full dict-like object.
26 UserDict.DictMixin is not suitable for this purpose because it's
27 an old-style class.
29 This class is not optimized for very large dictionaries; many
30 functions have linear memory requirements. I recommend you
31 override some of these functions if speed is required.
32 """
34 def __iter__(self):
35 return iter(self.keys())
37 def has_key(self, key):
38 try: self[key]
39 except KeyError: return False
40 else: return True
41 __contains__ = has_key
43 iterkeys = lambda self: iter(self.keys())
45 def values(self):
46 return map(self.__getitem__, self.keys())
47 itervalues = lambda self: iter(self.values())
49 def items(self):
50 return zip(self.keys(), self.values())
51 iteritems = lambda s: iter(s.items())
53 def clear(self):
54 map(self.__delitem__, self.keys())
56 def pop(self, key, *args):
57 if len(args) > 1:
58 raise TypeError("pop takes at most two arguments")
59 try: value = self[key]
60 except KeyError:
61 if args: return args[0]
62 else: raise
63 del(self[key])
64 return value
66 def popitem(self):
67 try:
68 key = self.keys()[0]
69 return key, self.pop(key)
70 except IndexError: raise KeyError("dictionary is empty")
72 def update(self, other=None, **kwargs):
73 if other is None:
74 self.update(kwargs)
75 other = {}
77 try: map(self.__setitem__, other.keys(), other.values())
78 except AttributeError:
79 for key, value in other:
80 self[key] = value
82 def setdefault(self, key, default=None):
83 try: return self[key]
84 except KeyError:
85 self[key] = default
86 return default
88 def get(self, key, default=None):
89 try: return self[key]
90 except KeyError: return default
92 def __repr__(self):
93 return repr(dict(self.items()))
95 def __cmp__(self, other):
96 if other is None: return 1
97 else: return cmp(dict(self.items()), other)
99 __hash__ = object.__hash__
101 def __len__(self):
102 return len(self.keys())
104 class DictProxy(DictMixin):
105 def __init__(self, *args, **kwargs):
106 self.__dict = {}
107 super(DictProxy, self).__init__(*args, **kwargs)
109 def __getitem__(self, key):
110 return self.__dict[key]
112 def __setitem__(self, key, value):
113 self.__dict[key] = value
115 def __delitem__(self, key):
116 del(self.__dict[key])
118 def keys(self):
119 return self.__dict.keys()
121 class cdata(object):
122 """C character buffer to Python numeric type conversions."""
124 from struct import error
126 short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
127 ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
129 short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
130 ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
132 int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
133 uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
135 int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
136 uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
138 longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
139 ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
141 longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
142 ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
144 to_short_le = staticmethod(lambda data: struct.pack('<h', data))
145 to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
147 to_short_be = staticmethod(lambda data: struct.pack('>h', data))
148 to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
150 to_int_le = staticmethod(lambda data: struct.pack('<i', data))
151 to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
153 to_int_be = staticmethod(lambda data: struct.pack('>i', data))
154 to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
156 to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
157 to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
159 to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
160 to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
162 bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
163 for val in range(256)])
164 del(i)
165 del(val)
167 test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
169 def lock(fileobj):
170 """Lock a file object 'safely'.
172 That means a failure to lock because the platform doesn't
173 support fcntl or filesystem locks is not considered a
174 failure. This call does block.
176 Returns whether or not the lock was successful, or
177 raises an exception in more extreme circumstances (full
178 lock table, invalid file).
180 try: import fcntl
181 except ImportError:
182 return False
183 else:
184 try: fcntl.lockf(fileobj, fcntl.LOCK_EX)
185 except IOError:
186 # FIXME: There's possibly a lot of complicated
187 # logic that needs to go here in case the IOError
188 # is EACCES or EAGAIN.
189 return False
190 else:
191 return True
193 def unlock(fileobj):
194 """Unlock a file object.
196 Don't call this on a file object unless a call to lock()
197 returned true.
199 # If this fails there's a mismatched lock/unlock pair,
200 # so we definitely don't want to ignore errors.
201 import fcntl
202 fcntl.lockf(fileobj, fcntl.LOCK_UN)
204 def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
205 """Insert size bytes of empty space starting at offset.
207 fobj must be an open file object, open rb+ or
208 equivalent. Mutagen tries to use mmap to resize the file, but
209 falls back to a significantly slower method if mmap fails.
211 assert 0 < size
212 assert 0 <= offset
213 locked = False
214 fobj.seek(0, 2)
215 filesize = fobj.tell()
216 movesize = filesize - offset
217 fobj.write('\x00' * size)
218 fobj.flush()
219 try:
220 try:
221 import mmap
222 map = mmap.mmap(fobj.fileno(), filesize + size)
223 try: map.move(offset + size, offset, movesize)
224 finally: map.close()
225 except (ValueError, EnvironmentError, ImportError):
226 # handle broken mmap scenarios
227 locked = lock(fobj)
228 fobj.truncate(filesize)
230 fobj.seek(0, 2)
231 padsize = size
232 # Don't generate an enormous string if we need to pad
233 # the file out several megs.
234 while padsize:
235 addsize = min(BUFFER_SIZE, padsize)
236 fobj.write("\x00" * addsize)
237 padsize -= addsize
239 fobj.seek(filesize, 0)
240 while movesize:
241 # At the start of this loop, fobj is pointing at the end
242 # of the data we need to move, which is of movesize length.
243 thismove = min(BUFFER_SIZE, movesize)
244 # Seek back however much we're going to read this frame.
245 fobj.seek(-thismove, 1)
246 nextpos = fobj.tell()
247 # Read it, so we're back at the end.
248 data = fobj.read(thismove)
249 # Seek back to where we need to write it.
250 fobj.seek(-thismove + size, 1)
251 # Write it.
252 fobj.write(data)
253 # And seek back to the end of the unmoved data.
254 fobj.seek(nextpos)
255 movesize -= thismove
257 fobj.flush()
258 finally:
259 if locked:
260 unlock(fobj)
262 def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
263 """Delete size bytes of empty space starting at offset.
265 fobj must be an open file object, open rb+ or
266 equivalent. Mutagen tries to use mmap to resize the file, but
267 falls back to a significantly slower method if mmap fails.
269 locked = False
270 assert 0 < size
271 assert 0 <= offset
272 fobj.seek(0, 2)
273 filesize = fobj.tell()
274 movesize = filesize - offset - size
275 assert 0 <= movesize
276 try:
277 if movesize > 0:
278 fobj.flush()
279 try:
280 import mmap
281 map = mmap.mmap(fobj.fileno(), filesize)
282 try: map.move(offset, offset + size, movesize)
283 finally: map.close()
284 except (ValueError, EnvironmentError, ImportError):
285 # handle broken mmap scenarios
286 locked = lock(fobj)
287 fobj.seek(offset + size)
288 buf = fobj.read(BUFFER_SIZE)
289 while buf:
290 fobj.seek(offset)
291 fobj.write(buf)
292 offset += len(buf)
293 fobj.seek(offset + size)
294 buf = fobj.read(BUFFER_SIZE)
295 fobj.truncate(filesize - size)
296 fobj.flush()
297 finally:
298 if locked:
299 unlock(fobj)
301 def utf8(data):
302 """Convert a basestring to a valid UTF-8 str."""
303 if isinstance(data, str):
304 return data.decode("utf-8", "replace").encode("utf-8")
305 elif isinstance(data, unicode):
306 return data.encode("utf-8")
307 else: raise TypeError("only unicode/str types can be converted to UTF-8")
309 def dict_match(d, key, default=None):
310 try:
311 return d[key]
312 except KeyError:
313 for pattern, value in d.iteritems():
314 if fnmatchcase(key, pattern):
315 return value
316 return default