Some people are reporting bogus data coming back from Zeroconf scans,
[pyTivo/TheBayer.git] / mutagen / _util.py
blob69078c27b5c48b5ff1b944490a1f6554b1c7ac3f
1 # Copyright 2006 Joe Wreschnig <piman@sacredchao.net>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License version 2 as
5 # published by the Free Software Foundation.
7 # $Id: _util.py 4218 2007-12-02 06:11:20Z piman $
9 """Utility classes for Mutagen.
11 You should not rely on the interfaces here being stable. They are
12 intended for internal use in Mutagen only.
13 """
15 import struct
17 class DictMixin(object):
18 """Implement the dict API using keys() and __*item__ methods.
20 Similar to UserDict.DictMixin, this takes a class that defines
21 __getitem__, __setitem__, __delitem__, and keys(), and turns it
22 into a full dict-like object.
24 UserDict.DictMixin is not suitable for this purpose because it's
25 an old-style class.
27 This class is not optimized for very large dictionaries; many
28 functions have linear memory requirements. I recommend you
29 override some of these functions if speed is required.
30 """
32 def __iter__(self):
33 return iter(self.keys())
35 def has_key(self, key):
36 try: self[key]
37 except KeyError: return False
38 else: return True
39 __contains__ = has_key
41 iterkeys = lambda self: iter(self.keys())
43 def values(self):
44 return map(self.__getitem__, self.keys())
45 itervalues = lambda self: iter(self.values())
47 def items(self):
48 return zip(self.keys(), self.values())
49 iteritems = lambda s: iter(s.items())
51 def clear(self):
52 map(self.__delitem__, self.keys())
54 def pop(self, key, *args):
55 if len(args) > 1:
56 raise TypeError("pop takes at most two arguments")
57 try: value = self[key]
58 except KeyError:
59 if args: return args[0]
60 else: raise
61 del(self[key])
62 return value
64 def popitem(self):
65 try:
66 key = self.keys()[0]
67 return key, self.pop(key)
68 except IndexError: raise KeyError("dictionary is empty")
70 def update(self, other=None, **kwargs):
71 if other is None:
72 self.update(kwargs)
73 other = {}
75 try: map(self.__setitem__, other.keys(), other.values())
76 except AttributeError:
77 for key, value in other:
78 self[key] = value
80 def setdefault(self, key, default=None):
81 try: return self[key]
82 except KeyError:
83 self[key] = default
84 return default
86 def get(self, key, default=None):
87 try: return self[key]
88 except KeyError: return default
90 def __repr__(self):
91 return repr(dict(self.items()))
93 def __cmp__(self, other):
94 if other is None: return 1
95 else: return cmp(dict(self.items()), other)
97 def __len__(self):
98 return len(self.keys())
100 class DictProxy(DictMixin):
101 def __init__(self, *args, **kwargs):
102 self.__dict = {}
103 super(DictProxy, self).__init__(*args, **kwargs)
105 def __getitem__(self, key):
106 return self.__dict[key]
108 def __setitem__(self, key, value):
109 self.__dict[key] = value
111 def __delitem__(self, key):
112 del(self.__dict[key])
114 def keys(self):
115 return self.__dict.keys()
117 class cdata(object):
118 """C character buffer to Python numeric type conversions."""
120 from struct import error
122 short_le = staticmethod(lambda data: struct.unpack('<h', data)[0])
123 ushort_le = staticmethod(lambda data: struct.unpack('<H', data)[0])
125 short_be = staticmethod(lambda data: struct.unpack('>h', data)[0])
126 ushort_be = staticmethod(lambda data: struct.unpack('>H', data)[0])
128 int_le = staticmethod(lambda data: struct.unpack('<i', data)[0])
129 uint_le = staticmethod(lambda data: struct.unpack('<I', data)[0])
131 int_be = staticmethod(lambda data: struct.unpack('>i', data)[0])
132 uint_be = staticmethod(lambda data: struct.unpack('>I', data)[0])
134 longlong_le = staticmethod(lambda data: struct.unpack('<q', data)[0])
135 ulonglong_le = staticmethod(lambda data: struct.unpack('<Q', data)[0])
137 longlong_be = staticmethod(lambda data: struct.unpack('>q', data)[0])
138 ulonglong_be = staticmethod(lambda data: struct.unpack('>Q', data)[0])
140 to_short_le = staticmethod(lambda data: struct.pack('<h', data))
141 to_ushort_le = staticmethod(lambda data: struct.pack('<H', data))
143 to_short_be = staticmethod(lambda data: struct.pack('>h', data))
144 to_ushort_be = staticmethod(lambda data: struct.pack('>H', data))
146 to_int_le = staticmethod(lambda data: struct.pack('<i', data))
147 to_uint_le = staticmethod(lambda data: struct.pack('<I', data))
149 to_int_be = staticmethod(lambda data: struct.pack('>i', data))
150 to_uint_be = staticmethod(lambda data: struct.pack('>I', data))
152 to_longlong_le = staticmethod(lambda data: struct.pack('<q', data))
153 to_ulonglong_le = staticmethod(lambda data: struct.pack('<Q', data))
155 to_longlong_be = staticmethod(lambda data: struct.pack('>q', data))
156 to_ulonglong_be = staticmethod(lambda data: struct.pack('>Q', data))
158 bitswap = ''.join([chr(sum([((val >> i) & 1) << (7-i) for i in range(8)]))
159 for val in range(256)])
160 del(i)
161 del(val)
163 test_bit = staticmethod(lambda value, n: bool((value >> n) & 1))
165 def lock(fileobj):
166 """Lock a file object 'safely'.
168 That means a failure to lock because the platform doesn't
169 support fcntl or filesystem locks is not considered a
170 failure. This call does block.
172 Returns whether or not the lock was successful, or
173 raises an exception in more extreme circumstances (full
174 lock table, invalid file).
176 try: import fcntl
177 except ImportError:
178 return False
179 else:
180 try: fcntl.lockf(fileobj, fcntl.LOCK_EX)
181 except IOError:
182 # FIXME: There's possibly a lot of complicated
183 # logic that needs to go here in case the IOError
184 # is EACCES or EAGAIN.
185 return False
186 else:
187 return True
189 def unlock(fileobj):
190 """Unlock a file object.
192 Don't call this on a file object unless a call to lock()
193 returned true.
195 # If this fails there's a mismatched lock/unlock pair,
196 # so we definitely don't want to ignore errors.
197 import fcntl
198 fcntl.lockf(fileobj, fcntl.LOCK_UN)
200 def insert_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
201 """Insert size bytes of empty space starting at offset.
203 fobj must be an open file object, open rb+ or
204 equivalent. Mutagen tries to use mmap to resize the file, but
205 falls back to a significantly slower method if mmap fails.
207 assert 0 < size
208 assert 0 <= offset
209 locked = False
210 fobj.seek(0, 2)
211 filesize = fobj.tell()
212 movesize = filesize - offset
213 fobj.write('\x00' * size)
214 fobj.flush()
215 try:
216 try:
217 import mmap
218 map = mmap.mmap(fobj.fileno(), filesize + size)
219 try: map.move(offset + size, offset, movesize)
220 finally: map.close()
221 except (ValueError, EnvironmentError, ImportError):
222 # handle broken mmap scenarios
223 locked = lock(fobj)
224 fobj.truncate(filesize)
226 fobj.seek(0, 2)
227 padsize = size
228 # Don't generate an enormous string if we need to pad
229 # the file out several megs.
230 while padsize:
231 addsize = min(BUFFER_SIZE, padsize)
232 fobj.write("\x00" * addsize)
233 padsize -= addsize
235 fobj.seek(filesize, 0)
236 while movesize:
237 # At the start of this loop, fobj is pointing at the end
238 # of the data we need to move, which is of movesize length.
239 thismove = min(BUFFER_SIZE, movesize)
240 # Seek back however much we're going to read this frame.
241 fobj.seek(-thismove, 1)
242 nextpos = fobj.tell()
243 # Read it, so we're back at the end.
244 data = fobj.read(thismove)
245 # Seek back to where we need to write it.
246 fobj.seek(-thismove + size, 1)
247 # Write it.
248 fobj.write(data)
249 # And seek back to the end of the unmoved data.
250 fobj.seek(nextpos)
251 movesize -= thismove
253 fobj.flush()
254 finally:
255 if locked:
256 unlock(fobj)
258 def delete_bytes(fobj, size, offset, BUFFER_SIZE=2**16):
259 """Delete size bytes of empty space starting at offset.
261 fobj must be an open file object, open rb+ or
262 equivalent. Mutagen tries to use mmap to resize the file, but
263 falls back to a significantly slower method if mmap fails.
265 locked = False
266 assert 0 < size
267 assert 0 <= offset
268 fobj.seek(0, 2)
269 filesize = fobj.tell()
270 movesize = filesize - offset - size
271 assert 0 <= movesize
272 try:
273 if movesize > 0:
274 fobj.flush()
275 try:
276 import mmap
277 map = mmap.mmap(fobj.fileno(), filesize)
278 try: map.move(offset, offset + size, movesize)
279 finally: map.close()
280 except (ValueError, EnvironmentError, ImportError):
281 # handle broken mmap scenarios
282 locked = lock(fobj)
283 fobj.seek(offset + size)
284 buf = fobj.read(BUFFER_SIZE)
285 while buf:
286 fobj.seek(offset)
287 fobj.write(buf)
288 offset += len(buf)
289 fobj.seek(offset + size)
290 buf = fobj.read(BUFFER_SIZE)
291 fobj.truncate(filesize - size)
292 fobj.flush()
293 finally:
294 if locked:
295 unlock(fobj)
297 def utf8(data):
298 """Convert a basestring to a valid UTF-8 str."""
299 if isinstance(data, str):
300 return data.decode("utf-8", "replace").encode("utf-8")
301 elif isinstance(data, unicode):
302 return data.encode("utf-8")
303 else: raise TypeError("only unicode/str types can be converted to UTF-8")