Accomodate some broken installations of the Python Imaging Library.
[pyTivo/TheBayer.git] / mutagen / asf.py
blob816ff5e8ca3510ce72e73ad18b1e9727aad851c6
1 # Copyright 2006-2007 Lukas Lalinsky
2 # Copyright 2005-2006 Joe Wreschnig
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License version 2 as
6 # published by the Free Software Foundation.
8 # $Id: asf.py 4224 2007-12-03 09:01:49Z luks $
10 """Read and write ASF (Window Media Audio) files."""
12 __all__ = ["ASF", "Open"]
14 import struct
15 from mutagen import FileType, Metadata
16 from mutagen._util import insert_bytes, delete_bytes, DictMixin
18 class error(IOError): pass
19 class ASFError(error): pass
20 class ASFHeaderError(error): pass
23 class ASFInfo(object):
24 """ASF stream information."""
26 def __init__(self):
27 self.length = 0.0
28 self.sample_rate = 0
29 self.bitrate = 0
30 self.channels = 0
32 def pprint(self):
33 s = "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % (
34 self.bitrate, self.sample_rate, self.channels, self.length)
35 return s
38 class ASFTags(list, DictMixin, Metadata):
39 """Dictionary containing ASF attributes."""
41 def pprint(self):
42 return "\n".join(["%s=%s" % (k, v) for k, v in self])
44 def __getitem__(self, key):
45 """A list of values for the key.
47 This is a copy, so comment['title'].append('a title') will not
48 work.
50 """
51 values = [value for (k, value) in self if k == key]
52 if not values: raise KeyError, key
53 else: return values
55 def __delitem__(self, key):
56 """Delete all values associated with the key."""
57 to_delete = filter(lambda x: x[0] == key, self)
58 if not to_delete: raise KeyError, key
59 else: map(self.remove, to_delete)
61 def __contains__(self, key):
62 """Return true if the key has any values."""
63 for k, value in self:
64 if k == key: return True
65 else: return False
67 def __setitem__(self, key, values):
68 """Set a key's value or values.
70 Setting a value overwrites all old ones. The value may be a
71 list of Unicode or UTF-8 strings, or a single Unicode or UTF-8
72 string.
74 """
75 if not isinstance(values, list):
76 values = [values]
77 try: del(self[key])
78 except KeyError: pass
79 for value in values:
80 if key in _standard_attribute_names:
81 value = unicode(value)
82 elif not isinstance(value, ASFBaseAttribute):
83 if isinstance(value, basestring):
84 value = ASFUnicodeAttribute(value)
85 elif isinstance(value, bool):
86 value = ASFBoolAttribute(value)
87 elif isinstance(value, int):
88 value = ASFDWordAttribute(value)
89 elif isinstance(value, long):
90 value = ASFQWordAttribute(value)
91 self.append((key, value))
93 def keys(self):
94 """Return all keys in the comment."""
95 return self and set(zip(*self)[0])
97 def as_dict(self):
98 """Return a copy of the comment data in a real dict."""
99 d = {}
100 for key, value in self:
101 d.setdefault(key, []).append(value)
102 return d
105 class ASFBaseAttribute(object):
106 """Generic attribute."""
107 TYPE = None
109 def __init__(self, value=None, data=None, language=None,
110 stream=None, **kwargs):
111 self.language = language
112 self.stream = stream
113 if data:
114 self.value = self.parse(data, **kwargs)
115 else:
116 self.value = value
118 def __repr__(self):
119 name = "%s(%r" % (type(self).__name__, self.value)
120 if self.language:
121 name += ", language=%d" % self.language
122 if self.stream:
123 name += ", stream=%d" % self.stream
124 name += ")"
125 return name
127 def render(self, name):
128 name = name.encode("utf-16-le") + "\x00\x00"
129 data = self._render()
130 return (struct.pack("<H", len(name)) + name +
131 struct.pack("<HH", self.TYPE, len(data)) + data)
133 def render_m(self, name):
134 name = name.encode("utf-16-le") + "\x00\x00"
135 if self.TYPE == 2:
136 data = self._render(dword=False)
137 else:
138 data = self._render()
139 return (struct.pack("<HHHHI", 0, self.stream or 0, len(name),
140 self.TYPE, len(data)) + name + data)
142 def render_ml(self, name):
143 name = name.encode("utf-16-le") + "\x00\x00"
144 if self.TYPE == 2:
145 data = self._render(dword=False)
146 else:
147 data = self._render()
148 return (struct.pack("<HHHHI", self.language or 0, self.stream or 0,
149 len(name), self.TYPE, len(data)) + name + data)
151 class ASFUnicodeAttribute(ASFBaseAttribute):
152 """Unicode string attribute."""
153 TYPE = 0x0000
155 def parse(self, data):
156 return data.decode("utf-16-le").strip("\x00")
158 def _render(self):
159 return self.value.encode("utf-16-le") + "\x00\x00"
161 def __str__(self):
162 return self.value
164 def __cmp__(self, other):
165 return cmp(unicode(self), other)
168 class ASFByteArrayAttribute(ASFBaseAttribute):
169 """Byte array attribute."""
170 TYPE = 0x0001
172 def parse(self, data):
173 return data
175 def _render(self):
176 return self.value
178 def __str__(self):
179 return "[binary data (%s bytes)]" % len(self.value)
181 def __cmp__(self, other):
182 return cmp(str(self), other)
185 class ASFBoolAttribute(ASFBaseAttribute):
186 """Bool attribute."""
187 TYPE = 0x0002
189 def parse(self, data, dword=True):
190 if dword:
191 return struct.unpack("<I", data)[0] == 1
192 else:
193 return struct.unpack("<H", data)[0] == 1
195 def _render(self, dword=True):
196 if dword:
197 return struct.pack("<I", int(self.value))
198 else:
199 return struct.pack("<H", int(self.value))
201 def __bool__(self):
202 return self.value
204 def __str__(self):
205 return str(self.value)
207 def __cmp__(self, other):
208 return cmp(bool(self), other)
211 class ASFDWordAttribute(ASFBaseAttribute):
212 """DWORD attribute."""
213 TYPE = 0x0003
215 def parse(self, data):
216 return struct.unpack("<L", data)[0]
218 def _render(self):
219 return struct.pack("<L", self.value)
221 def __int__(self):
222 return self.value
224 def __str__(self):
225 return str(self.value)
227 def __cmp__(self, other):
228 return cmp(int(self), other)
231 class ASFQWordAttribute(ASFBaseAttribute):
232 """QWORD attribute."""
233 TYPE = 0x0004
235 def parse(self, data):
236 return struct.unpack("<Q", data)[0]
238 def _render(self):
239 return struct.pack("<Q", self.value)
241 def __int__(self):
242 return self.value
244 def __str__(self):
245 return str(self.value)
247 def __cmp__(self, other):
248 return cmp(int(self), other)
251 class ASFWordAttribute(ASFBaseAttribute):
252 """WORD attribute."""
253 TYPE = 0x0005
255 def parse(self, data):
256 return struct.unpack("<H", data)[0]
258 def _render(self):
259 return struct.pack("<H", self.value)
261 def __int__(self):
262 return self.value
264 def __str__(self):
265 return str(self.value)
267 def __cmp__(self, other):
268 return cmp(int(self), other)
271 class ASFGUIDAttribute(ASFBaseAttribute):
272 """GUID attribute."""
273 TYPE = 0x0006
275 def parse(self, data):
276 return data
278 def _render(self):
279 return self.value
281 def __str__(self):
282 return self.value
284 def __cmp__(self, other):
285 return cmp(str(self), other)
288 UNICODE = ASFUnicodeAttribute.TYPE
289 BYTEARRAY = ASFByteArrayAttribute.TYPE
290 BOOL = ASFBoolAttribute.TYPE
291 DWORD = ASFDWordAttribute.TYPE
292 QWORD = ASFQWordAttribute.TYPE
293 WORD = ASFWordAttribute.TYPE
294 GUID = ASFGUIDAttribute.TYPE
296 def ASFValue(value, kind, **kwargs):
297 for t, c in _attribute_types.items():
298 if kind == t:
299 return c(value=value, **kwargs)
300 raise ValueError("Unknown value type")
303 _attribute_types = {
304 ASFUnicodeAttribute.TYPE: ASFUnicodeAttribute,
305 ASFByteArrayAttribute.TYPE: ASFByteArrayAttribute,
306 ASFBoolAttribute.TYPE: ASFBoolAttribute,
307 ASFDWordAttribute.TYPE: ASFDWordAttribute,
308 ASFQWordAttribute.TYPE: ASFQWordAttribute,
309 ASFWordAttribute.TYPE: ASFWordAttribute,
310 ASFGUIDAttribute.TYPE: ASFGUIDAttribute,
314 _standard_attribute_names = [
315 "Title",
316 "Author",
317 "Copyright",
318 "Description",
319 "Rating"
323 class BaseObject(object):
324 """Base ASF object."""
325 GUID = None
327 def parse(self, asf, data, fileobj, size):
328 self.data = data
330 def render(self, asf):
331 data = self.GUID + struct.pack("<Q", len(self.data) + 24) + self.data
332 size = len(data)
333 return data
336 class UnknownObject(BaseObject):
337 """Unknown ASF object."""
338 def __init__(self, guid):
339 self.GUID = guid
342 class HeaderObject(object):
343 """ASF header."""
344 GUID = "\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
347 class ContentDescriptionObject(BaseObject):
348 """Content description."""
349 GUID = "\x33\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
351 def parse(self, asf, data, fileobj, size):
352 super(ContentDescriptionObject, self).parse(asf, data, fileobj, size)
353 asf.content_description_obj = self
354 lengths = struct.unpack("<HHHHH", data[:10])
355 texts = []
356 pos = 10
357 for length in lengths:
358 end = pos + length
359 texts.append(data[pos:end].decode("utf-16-le").strip("\x00"))
360 pos = end
361 (asf.tags["Title"], asf.tags["Author"], asf.tags["Copyright"],
362 asf.tags["Description"], asf.tags["Rating"]) = texts
364 def render(self, asf):
365 def render_text(name):
366 value = asf.tags.get(name, [])
367 if value and value[0]:
368 return value[0].encode("utf-16-le") + "\x00\x00"
369 else:
370 return ""
371 texts = map(render_text, _standard_attribute_names)
372 data = struct.pack("<HHHHH", *map(str.__len__, texts)) + "".join(texts)
373 return self.GUID + struct.pack("<Q", 24 + len(data)) + data
376 class ExtendedContentDescriptionObject(BaseObject):
377 """Extended content description."""
378 GUID = "\x40\xA4\xD0\xD2\x07\xE3\xD2\x11\x97\xF0\x00\xA0\xC9\x5E\xA8\x50"
380 def parse(self, asf, data, fileobj, size):
381 super(ExtendedContentDescriptionObject, self).parse(asf, data, fileobj, size)
382 asf.extended_content_description_obj = self
383 num_attributes, = struct.unpack("<H", data[0:2])
384 pos = 2
385 for i in range(num_attributes):
386 name_length, = struct.unpack("<H", data[pos:pos+2])
387 pos += 2
388 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
389 pos += name_length
390 value_type, value_length = struct.unpack("<HH", data[pos:pos+4])
391 pos += 4
392 value = data[pos:pos+value_length]
393 pos += value_length
394 attr = _attribute_types[value_type](data=value)
395 asf.tags.append((name, attr))
397 def render(self, asf):
398 attrs = asf.to_extended_content_description.items()
399 data = "".join([attr.render(name) for (name, attr) in attrs])
400 data = struct.pack("<QH", 26 + len(data), len(attrs)) + data
401 return self.GUID + data
404 class FilePropertiesObject(BaseObject):
405 """File properties."""
406 GUID = "\xA1\xDC\xAB\x8C\x47\xA9\xCF\x11\x8E\xE4\x00\xC0\x0C\x20\x53\x65"
408 def parse(self, asf, data, fileobj, size):
409 super(FilePropertiesObject, self).parse(asf, data, fileobj, size)
410 length, _, preroll = struct.unpack("<QQQ", data[40:64])
411 asf.info.length = length / 10000000.0 - preroll / 1000.0
414 class StreamPropertiesObject(BaseObject):
415 """Stream properties."""
416 GUID = "\x91\x07\xDC\xB7\xB7\xA9\xCF\x11\x8E\xE6\x00\xC0\x0C\x20\x53\x65"
418 def parse(self, asf, data, fileobj, size):
419 super(StreamPropertiesObject, self).parse(asf, data, fileobj, size)
420 channels, sample_rate, bitrate = struct.unpack("<HII", data[56:66])
421 asf.info.channels = channels
422 asf.info.sample_rate = sample_rate
423 asf.info.bitrate = bitrate * 8
426 class HeaderExtensionObject(BaseObject):
427 """Header extension."""
428 GUID = "\xb5\x03\xbf_.\xa9\xcf\x11\x8e\xe3\x00\xc0\x0c Se"
430 def parse(self, asf, data, fileobj, size):
431 super(HeaderExtensionObject, self).parse(asf, data, fileobj, size)
432 asf.header_extension_obj = self
433 datasize, = struct.unpack("<I", data[18:22])
434 datapos = 0
435 self.objects = []
436 while datapos < datasize:
437 guid, size = struct.unpack("<16sQ", data[22+datapos:22+datapos+24])
438 if guid in _object_types:
439 obj = _object_types[guid]()
440 else:
441 obj = UnknownObject(guid)
442 obj.parse(asf, data[22+datapos+24:22+datapos+size], fileobj, size)
443 self.objects.append(obj)
444 datapos += size
446 def render(self, asf):
447 data = "".join([obj.render(asf) for obj in self.objects])
448 return (self.GUID + struct.pack("<Q", 24 + 16 + 6 + len(data)) +
449 "\x11\xD2\xD3\xAB\xBA\xA9\xcf\x11" +
450 "\x8E\xE6\x00\xC0\x0C\x20\x53\x65" +
451 "\x06\x00" + struct.pack("<I", len(data)) + data)
454 class MetadataObject(BaseObject):
455 """Metadata description."""
456 GUID = "\xea\xcb\xf8\xc5\xaf[wH\x84g\xaa\x8cD\xfaL\xca"
458 def parse(self, asf, data, fileobj, size):
459 super(MetadataObject, self).parse(asf, data, fileobj, size)
460 asf.metadata_obj = self
461 num_attributes, = struct.unpack("<H", data[0:2])
462 pos = 2
463 for i in range(num_attributes):
464 (reserved, stream, name_length, value_type,
465 value_length) = struct.unpack("<HHHHI", data[pos:pos+12])
466 pos += 12
467 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
468 pos += name_length
469 value = data[pos:pos+value_length]
470 pos += value_length
471 args = {'data': value, 'stream': stream}
472 if value_type == 2:
473 args['dword'] = False
474 attr = _attribute_types[value_type](**args)
475 asf.tags.append((name, attr))
477 def render(self, asf):
478 attrs = asf.to_metadata.items()
479 data = "".join([attr.render_m(name) for (name, attr) in attrs])
480 return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
481 data)
484 class MetadataLibraryObject(BaseObject):
485 """Metadata library description."""
486 GUID = "\x94\x1c#D\x98\x94\xd1I\xa1A\x1d\x13NEpT"
488 def parse(self, asf, data, fileobj, size):
489 super(MetadataLibraryObject, self).parse(asf, data, fileobj, size)
490 asf.metadata_library_obj = self
491 num_attributes, = struct.unpack("<H", data[0:2])
492 pos = 2
493 for i in range(num_attributes):
494 (language, stream, name_length, value_type,
495 value_length) = struct.unpack("<HHHHI", data[pos:pos+12])
496 pos += 12
497 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
498 pos += name_length
499 value = data[pos:pos+value_length]
500 pos += value_length
501 args = {'data': value, 'language': language, 'stream': stream}
502 if value_type == 2:
503 args['dword'] = False
504 attr = _attribute_types[value_type](**args)
505 asf.tags.append((name, attr))
507 def render(self, asf):
508 attrs = asf.to_metadata_library
509 data = "".join([attr.render_ml(name) for (name, attr) in attrs])
510 return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
511 data)
514 _object_types = {
515 ExtendedContentDescriptionObject.GUID: ExtendedContentDescriptionObject,
516 ContentDescriptionObject.GUID: ContentDescriptionObject,
517 FilePropertiesObject.GUID: FilePropertiesObject,
518 StreamPropertiesObject.GUID: StreamPropertiesObject,
519 HeaderExtensionObject.GUID: HeaderExtensionObject,
520 MetadataLibraryObject.GUID: MetadataLibraryObject,
521 MetadataObject.GUID: MetadataObject,
525 class ASF(FileType):
526 """An ASF file, probably containing WMA or WMV."""
528 _mimes = ["audio/x-ms-wma", "audio/x-ms-wmv", "video/x-ms-asf",
529 "audio/x-wma", "video/x-wmv"]
531 def load(self, filename):
532 self.filename = filename
533 fileobj = file(filename, "rb")
534 try:
535 self.size = 0
536 self.size1 = 0
537 self.size2 = 0
538 self.offset1 = 0
539 self.offset2 = 0
540 self.num_objects = 0
541 self.info = ASFInfo()
542 self.tags = ASFTags()
543 self.__read_file(fileobj)
544 finally:
545 fileobj.close()
547 def save(self):
548 # Move attributes to the right objects
549 self.to_extended_content_description = {}
550 self.to_metadata = {}
551 self.to_metadata_library = []
552 for name, value in self.tags:
553 if name in _standard_attribute_names:
554 continue
555 if (value.language is None and value.stream is None and
556 name not in self.to_extended_content_description):
557 self.to_extended_content_description[name] = value
558 elif (value.language is None and value.stream is not None and
559 name not in self.to_metadata):
560 self.to_metadata[name] = value
561 else:
562 self.to_metadata_library.append((name, value))
564 # Add missing objects
565 if not self.content_description_obj:
566 self.content_description_obj = \
567 ContentDescriptionObject()
568 self.objects.append(self.content_description_obj)
569 if not self.extended_content_description_obj:
570 self.extended_content_description_obj = \
571 ExtendedContentDescriptionObject()
572 self.objects.append(self.extended_content_description_obj)
573 if not self.header_extension_obj:
574 self.header_extension_obj = \
575 HeaderExtensionObject()
576 self.objects.append(self.header_extension_obj)
577 if not self.metadata_obj:
578 self.metadata_obj = \
579 MetadataObject()
580 self.header_extension_obj.objects.append(self.metadata_obj)
581 if not self.metadata_library_obj:
582 self.metadata_library_obj = \
583 MetadataLibraryObject()
584 self.header_extension_obj.objects.append(self.metadata_library_obj)
586 # Render the header
587 data = "".join([obj.render(self) for obj in self.objects])
588 data = (HeaderObject.GUID +
589 struct.pack("<QL", len(data) + 30, len(self.objects)) +
590 "\x01\x02" + data)
592 fileobj = file(self.filename, "rb+")
593 try:
594 size = len(data)
595 if size > self.size:
596 insert_bytes(fileobj, size - self.size, self.size)
597 if size < self.size:
598 delete_bytes(fileobj, self.size - size, 0)
599 fileobj.seek(0)
600 fileobj.write(data)
601 finally:
602 fileobj.close()
604 def __read_file(self, fileobj):
605 header = fileobj.read(30)
606 if len(header) != 30 or header[:16] != HeaderObject.GUID:
607 raise ASFHeaderError, "Not an ASF file."
609 self.extended_content_description_obj = None
610 self.content_description_obj = None
611 self.header_extension_obj = None
612 self.metadata_obj = None
613 self.metadata_library_obj = None
615 self.size, self.num_objects = struct.unpack("<QL", header[16:28])
616 self.objects = []
617 for i in range(self.num_objects):
618 self.__read_object(fileobj)
620 def __read_object(self, fileobj):
621 guid, size = struct.unpack("<16sQ", fileobj.read(24))
622 if guid in _object_types:
623 obj = _object_types[guid]()
624 else:
625 obj = UnknownObject(guid)
626 data = fileobj.read(size - 24)
627 obj.parse(self, data, fileobj, size)
628 self.objects.append(obj)
630 def score(filename, fileobj, header):
631 return header.startswith(HeaderObject.GUID) * 2
632 score = staticmethod(score)
634 Open = ASF