1 # Copyright 2006 Joe Wreschnig
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4218 2007-12-02 06:11:20Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
17 from fnmatch
import fnmatchcase
19 class DictMixin(object):
20 """Implement the dict API using keys() and __*item__ methods.
22 Similar to UserDict.DictMixin, this takes a class that defines
23 __getitem__, __setitem__, __delitem__, and keys(), and turns it
24 into a full dict-like object.
26 UserDict.DictMixin is not suitable for this purpose because it's
29 This class is not optimized for very large dictionaries; many
30 functions have linear memory requirements. I recommend you
31 override some of these functions if speed is required.
35 return iter(self
.keys())
37 def has_key(self
, key
):
39 except KeyError: return False
41 __contains__
= has_key
43 iterkeys
= lambda self
: iter(self
.keys())
46 return map(self
.__getitem
__, self
.keys())
47 itervalues
= lambda self
: iter(self
.values())
50 return zip(self
.keys(), self
.values())
51 iteritems
= lambda s
: iter(s
.items())
54 map(self
.__delitem
__, self
.keys())
56 def pop(self
, key
, *args
):
58 raise TypeError("pop takes at most two arguments")
59 try: value
= self
[key
]
61 if args
: return args
[0]
69 return key
, self
.pop(key
)
70 except IndexError: raise KeyError("dictionary is empty")
72 def update(self
, other
=None, **kwargs
):
77 try: map(self
.__setitem
__, other
.keys(), other
.values())
78 except AttributeError:
79 for key
, value
in other
:
82 def setdefault(self
, key
, default
=None):
88 def get(self
, key
, default
=None):
90 except KeyError: return default
93 return repr(dict(self
.items()))
95 def __cmp__(self
, other
):
96 if other
is None: return 1
97 else: return cmp(dict(self
.items()), other
)
99 __hash__
= object.__hash
__
102 return len(self
.keys())
104 class DictProxy(DictMixin
):
105 def __init__(self
, *args
, **kwargs
):
107 super(DictProxy
, self
).__init
__(*args
, **kwargs
)
109 def __getitem__(self
, key
):
110 return self
.__dict
[key
]
112 def __setitem__(self
, key
, value
):
113 self
.__dict
[key
] = value
115 def __delitem__(self
, key
):
116 del(self
.__dict
[key
])
119 return self
.__dict
.keys()
122 """C character buffer to Python numeric type conversions."""
124 from struct
import error
126 short_le
= staticmethod(lambda data
: struct
.unpack('<h', data
)[0])
127 ushort_le
= staticmethod(lambda data
: struct
.unpack('<H', data
)[0])
129 short_be
= staticmethod(lambda data
: struct
.unpack('>h', data
)[0])
130 ushort_be
= staticmethod(lambda data
: struct
.unpack('>H', data
)[0])
132 int_le
= staticmethod(lambda data
: struct
.unpack('<i', data
)[0])
133 uint_le
= staticmethod(lambda data
: struct
.unpack('<I', data
)[0])
135 int_be
= staticmethod(lambda data
: struct
.unpack('>i', data
)[0])
136 uint_be
= staticmethod(lambda data
: struct
.unpack('>I', data
)[0])
138 longlong_le
= staticmethod(lambda data
: struct
.unpack('<q', data
)[0])
139 ulonglong_le
= staticmethod(lambda data
: struct
.unpack('<Q', data
)[0])
141 longlong_be
= staticmethod(lambda data
: struct
.unpack('>q', data
)[0])
142 ulonglong_be
= staticmethod(lambda data
: struct
.unpack('>Q', data
)[0])
144 to_short_le
= staticmethod(lambda data
: struct
.pack('<h', data
))
145 to_ushort_le
= staticmethod(lambda data
: struct
.pack('<H', data
))
147 to_short_be
= staticmethod(lambda data
: struct
.pack('>h', data
))
148 to_ushort_be
= staticmethod(lambda data
: struct
.pack('>H', data
))
150 to_int_le
= staticmethod(lambda data
: struct
.pack('<i', data
))
151 to_uint_le
= staticmethod(lambda data
: struct
.pack('<I', data
))
153 to_int_be
= staticmethod(lambda data
: struct
.pack('>i', data
))
154 to_uint_be
= staticmethod(lambda data
: struct
.pack('>I', data
))
156 to_longlong_le
= staticmethod(lambda data
: struct
.pack('<q', data
))
157 to_ulonglong_le
= staticmethod(lambda data
: struct
.pack('<Q', data
))
159 to_longlong_be
= staticmethod(lambda data
: struct
.pack('>q', data
))
160 to_ulonglong_be
= staticmethod(lambda data
: struct
.pack('>Q', data
))
162 bitswap
= ''.join([chr(sum([((val
>> i
) & 1) << (7-i
) for i
in range(8)]))
163 for val
in range(256)])
167 test_bit
= staticmethod(lambda value
, n
: bool((value
>> n
) & 1))
170 """Lock a file object 'safely'.
172 That means a failure to lock because the platform doesn't
173 support fcntl or filesystem locks is not considered a
174 failure. This call does block.
176 Returns whether or not the lock was successful, or
177 raises an exception in more extreme circumstances (full
178 lock table, invalid file).
184 try: fcntl
.lockf(fileobj
, fcntl
.LOCK_EX
)
186 # FIXME: There's possibly a lot of complicated
187 # logic that needs to go here in case the IOError
188 # is EACCES or EAGAIN.
194 """Unlock a file object.
196 Don't call this on a file object unless a call to lock()
199 # If this fails there's a mismatched lock/unlock pair,
200 # so we definitely don't want to ignore errors.
202 fcntl
.lockf(fileobj
, fcntl
.LOCK_UN
)
204 def insert_bytes(fobj
, size
, offset
, BUFFER_SIZE
=2**16):
205 """Insert size bytes of empty space starting at offset.
207 fobj must be an open file object, open rb+ or
208 equivalent. Mutagen tries to use mmap to resize the file, but
209 falls back to a significantly slower method if mmap fails.
215 filesize
= fobj
.tell()
216 movesize
= filesize
- offset
217 fobj
.write('\x00' * size
)
222 map = mmap
.mmap(fobj
.fileno(), filesize
+ size
)
223 try: map.move(offset
+ size
, offset
, movesize
)
225 except (ValueError, EnvironmentError, ImportError):
226 # handle broken mmap scenarios
228 fobj
.truncate(filesize
)
232 # Don't generate an enormous string if we need to pad
233 # the file out several megs.
235 addsize
= min(BUFFER_SIZE
, padsize
)
236 fobj
.write("\x00" * addsize
)
239 fobj
.seek(filesize
, 0)
241 # At the start of this loop, fobj is pointing at the end
242 # of the data we need to move, which is of movesize length.
243 thismove
= min(BUFFER_SIZE
, movesize
)
244 # Seek back however much we're going to read this frame.
245 fobj
.seek(-thismove
, 1)
246 nextpos
= fobj
.tell()
247 # Read it, so we're back at the end.
248 data
= fobj
.read(thismove
)
249 # Seek back to where we need to write it.
250 fobj
.seek(-thismove
+ size
, 1)
253 # And seek back to the end of the unmoved data.
262 def delete_bytes(fobj
, size
, offset
, BUFFER_SIZE
=2**16):
263 """Delete size bytes of empty space starting at offset.
265 fobj must be an open file object, open rb+ or
266 equivalent. Mutagen tries to use mmap to resize the file, but
267 falls back to a significantly slower method if mmap fails.
273 filesize
= fobj
.tell()
274 movesize
= filesize
- offset
- size
281 map = mmap
.mmap(fobj
.fileno(), filesize
)
282 try: map.move(offset
, offset
+ size
, movesize
)
284 except (ValueError, EnvironmentError, ImportError):
285 # handle broken mmap scenarios
287 fobj
.seek(offset
+ size
)
288 buf
= fobj
.read(BUFFER_SIZE
)
293 fobj
.seek(offset
+ size
)
294 buf
= fobj
.read(BUFFER_SIZE
)
295 fobj
.truncate(filesize
- size
)
302 """Convert a basestring to a valid UTF-8 str."""
303 if isinstance(data
, str):
304 return data
.decode("utf-8", "replace").encode("utf-8")
305 elif isinstance(data
, unicode):
306 return data
.encode("utf-8")
307 else: raise TypeError("only unicode/str types can be converted to UTF-8")
309 def dict_match(d
, key
, default
=None):
313 for pattern
, value
in d
.iteritems():
314 if fnmatchcase(key
, pattern
):