Extend scan time to a full second.
[pyTivo/wmcbrine.git] / mutagen / asf.py
blob4a5ddb2f726b813ad5e0404a8b72d58635792ce1
1 # Copyright 2006-2007 Lukas Lalinsky
2 # Copyright 2005-2006 Joe Wreschnig
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License version 2 as
6 # published by the Free Software Foundation.
8 # $Id: asf.py 4224 2007-12-03 09:01:49Z luks $
10 """Read and write ASF (Window Media Audio) files."""
12 __all__ = ["ASF", "Open"]
14 import struct
15 from mutagen import FileType, Metadata
16 from mutagen._util import insert_bytes, delete_bytes, DictMixin
18 class error(IOError): pass
19 class ASFError(error): pass
20 class ASFHeaderError(error): pass
23 class ASFInfo(object):
24 """ASF stream information."""
26 def __init__(self):
27 self.length = 0.0
28 self.sample_rate = 0
29 self.bitrate = 0
30 self.channels = 0
32 def pprint(self):
33 s = "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % (
34 self.bitrate, self.sample_rate, self.channels, self.length)
35 return s
38 class ASFTags(list, DictMixin, Metadata):
39 """Dictionary containing ASF attributes."""
41 def pprint(self):
42 return "\n".join(["%s=%s" % (k, v) for k, v in self])
44 def __getitem__(self, key):
45 """A list of values for the key.
47 This is a copy, so comment['title'].append('a title') will not
48 work.
50 """
51 values = [value for (k, value) in self if k == key]
52 if not values: raise KeyError, key
53 else: return values
55 def __delitem__(self, key):
56 """Delete all values associated with the key."""
57 to_delete = filter(lambda x: x[0] == key, self)
58 if not to_delete: raise KeyError, key
59 else: map(self.remove, to_delete)
61 def __contains__(self, key):
62 """Return true if the key has any values."""
63 for k, value in self:
64 if k == key: return True
65 else: return False
67 def __setitem__(self, key, values):
68 """Set a key's value or values.
70 Setting a value overwrites all old ones. The value may be a
71 list of Unicode or UTF-8 strings, or a single Unicode or UTF-8
72 string.
74 """
75 if not isinstance(values, list):
76 values = [values]
77 try: del(self[key])
78 except KeyError: pass
79 for value in values:
80 if key in _standard_attribute_names:
81 value = unicode(value)
82 elif not isinstance(value, ASFBaseAttribute):
83 if isinstance(value, basestring):
84 value = ASFUnicodeAttribute(value)
85 elif isinstance(value, bool):
86 value = ASFBoolAttribute(value)
87 elif isinstance(value, int):
88 value = ASFDWordAttribute(value)
89 elif isinstance(value, long):
90 value = ASFQWordAttribute(value)
91 self.append((key, value))
93 def keys(self):
94 """Return all keys in the comment."""
95 return self and set(zip(*self)[0])
97 def as_dict(self):
98 """Return a copy of the comment data in a real dict."""
99 d = {}
100 for key, value in self:
101 d.setdefault(key, []).append(value)
102 return d
105 class ASFBaseAttribute(object):
106 """Generic attribute."""
107 TYPE = None
109 def __init__(self, value=None, data=None, language=None,
110 stream=None, **kwargs):
111 self.language = language
112 self.stream = stream
113 if data:
114 self.value = self.parse(data, **kwargs)
115 else:
116 self.value = value
118 def data_size(self):
119 raise NotImplementedError
121 def __repr__(self):
122 name = "%s(%r" % (type(self).__name__, self.value)
123 if self.language:
124 name += ", language=%d" % self.language
125 if self.stream:
126 name += ", stream=%d" % self.stream
127 name += ")"
128 return name
130 def render(self, name):
131 name = name.encode("utf-16-le") + "\x00\x00"
132 data = self._render()
133 return (struct.pack("<H", len(name)) + name +
134 struct.pack("<HH", self.TYPE, len(data)) + data)
136 def render_m(self, name):
137 name = name.encode("utf-16-le") + "\x00\x00"
138 if self.TYPE == 2:
139 data = self._render(dword=False)
140 else:
141 data = self._render()
142 return (struct.pack("<HHHHI", 0, self.stream or 0, len(name),
143 self.TYPE, len(data)) + name + data)
145 def render_ml(self, name):
146 name = name.encode("utf-16-le") + "\x00\x00"
147 if self.TYPE == 2:
148 data = self._render(dword=False)
149 else:
150 data = self._render()
151 return (struct.pack("<HHHHI", self.language or 0, self.stream or 0,
152 len(name), self.TYPE, len(data)) + name + data)
154 class ASFUnicodeAttribute(ASFBaseAttribute):
155 """Unicode string attribute."""
156 TYPE = 0x0000
158 def parse(self, data):
159 return data.decode("utf-16-le").strip("\x00")
161 def _render(self):
162 return self.value.encode("utf-16-le") + "\x00\x00"
164 def data_size(self):
165 return len(self.value) * 2 + 2
167 def __str__(self):
168 return self.value
170 def __cmp__(self, other):
171 return cmp(unicode(self), other)
173 __hash__ = ASFBaseAttribute.__hash__
176 class ASFByteArrayAttribute(ASFBaseAttribute):
177 """Byte array attribute."""
178 TYPE = 0x0001
180 def parse(self, data):
181 return data
183 def _render(self):
184 return self.value
186 def data_size(self):
187 return len(self.value)
189 def __str__(self):
190 return "[binary data (%s bytes)]" % len(self.value)
192 def __cmp__(self, other):
193 return cmp(str(self), other)
195 __hash__ = ASFBaseAttribute.__hash__
198 class ASFBoolAttribute(ASFBaseAttribute):
199 """Bool attribute."""
200 TYPE = 0x0002
202 def parse(self, data, dword=True):
203 if dword:
204 return struct.unpack("<I", data)[0] == 1
205 else:
206 return struct.unpack("<H", data)[0] == 1
208 def _render(self, dword=True):
209 if dword:
210 return struct.pack("<I", int(self.value))
211 else:
212 return struct.pack("<H", int(self.value))
214 def data_size(self):
215 return 4
217 def __bool__(self):
218 return self.value
220 def __str__(self):
221 return str(self.value)
223 def __cmp__(self, other):
224 return cmp(bool(self), other)
226 __hash__ = ASFBaseAttribute.__hash__
229 class ASFDWordAttribute(ASFBaseAttribute):
230 """DWORD attribute."""
231 TYPE = 0x0003
233 def parse(self, data):
234 return struct.unpack("<L", data)[0]
236 def _render(self):
237 return struct.pack("<L", self.value)
239 def data_size(self):
240 return 4
242 def __int__(self):
243 return self.value
245 def __str__(self):
246 return str(self.value)
248 def __cmp__(self, other):
249 return cmp(int(self), other)
251 __hash__ = ASFBaseAttribute.__hash__
254 class ASFQWordAttribute(ASFBaseAttribute):
255 """QWORD attribute."""
256 TYPE = 0x0004
258 def parse(self, data):
259 return struct.unpack("<Q", data)[0]
261 def _render(self):
262 return struct.pack("<Q", self.value)
264 def data_size(self):
265 return 8
267 def __int__(self):
268 return self.value
270 def __str__(self):
271 return str(self.value)
273 def __cmp__(self, other):
274 return cmp(int(self), other)
276 __hash__ = ASFBaseAttribute.__hash__
279 class ASFWordAttribute(ASFBaseAttribute):
280 """WORD attribute."""
281 TYPE = 0x0005
283 def parse(self, data):
284 return struct.unpack("<H", data)[0]
286 def _render(self):
287 return struct.pack("<H", self.value)
289 def data_size(self):
290 return 2
292 def __int__(self):
293 return self.value
295 def __str__(self):
296 return str(self.value)
298 def __cmp__(self, other):
299 return cmp(int(self), other)
301 __hash__ = ASFBaseAttribute.__hash__
304 class ASFGUIDAttribute(ASFBaseAttribute):
305 """GUID attribute."""
306 TYPE = 0x0006
308 def parse(self, data):
309 return data
311 def _render(self):
312 return self.value
314 def data_size(self):
315 return len(self.value)
317 def __str__(self):
318 return self.value
320 def __cmp__(self, other):
321 return cmp(str(self), other)
323 __hash__ = ASFBaseAttribute.__hash__
326 UNICODE = ASFUnicodeAttribute.TYPE
327 BYTEARRAY = ASFByteArrayAttribute.TYPE
328 BOOL = ASFBoolAttribute.TYPE
329 DWORD = ASFDWordAttribute.TYPE
330 QWORD = ASFQWordAttribute.TYPE
331 WORD = ASFWordAttribute.TYPE
332 GUID = ASFGUIDAttribute.TYPE
334 def ASFValue(value, kind, **kwargs):
335 for t, c in _attribute_types.items():
336 if kind == t:
337 return c(value=value, **kwargs)
338 raise ValueError("Unknown value type")
341 _attribute_types = {
342 ASFUnicodeAttribute.TYPE: ASFUnicodeAttribute,
343 ASFByteArrayAttribute.TYPE: ASFByteArrayAttribute,
344 ASFBoolAttribute.TYPE: ASFBoolAttribute,
345 ASFDWordAttribute.TYPE: ASFDWordAttribute,
346 ASFQWordAttribute.TYPE: ASFQWordAttribute,
347 ASFWordAttribute.TYPE: ASFWordAttribute,
348 ASFGUIDAttribute.TYPE: ASFGUIDAttribute,
352 _standard_attribute_names = [
353 "Title",
354 "Author",
355 "Copyright",
356 "Description",
357 "Rating"
361 class BaseObject(object):
362 """Base ASF object."""
363 GUID = None
365 def parse(self, asf, data, fileobj, size):
366 self.data = data
368 def render(self, asf):
369 data = self.GUID + struct.pack("<Q", len(self.data) + 24) + self.data
370 size = len(data)
371 return data
374 class UnknownObject(BaseObject):
375 """Unknown ASF object."""
376 def __init__(self, guid):
377 self.GUID = guid
380 class HeaderObject(object):
381 """ASF header."""
382 GUID = "\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
385 class ContentDescriptionObject(BaseObject):
386 """Content description."""
387 GUID = "\x33\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
389 def parse(self, asf, data, fileobj, size):
390 super(ContentDescriptionObject, self).parse(asf, data, fileobj, size)
391 asf.content_description_obj = self
392 lengths = struct.unpack("<HHHHH", data[:10])
393 texts = []
394 pos = 10
395 for length in lengths:
396 end = pos + length
397 if length > 0:
398 texts.append(data[pos:end].decode("utf-16-le").strip("\x00"))
399 else:
400 texts.append(None)
401 pos = end
402 title, author, copyright, desc, rating = texts
403 for key, value in dict(
404 Title=title,
405 Author=author,
406 Copyright=copyright,
407 Description=desc,
408 Rating=rating).items():
409 if value is not None:
410 asf.tags[key] = value
412 def render(self, asf):
413 def render_text(name):
414 value = asf.tags.get(name, [])
415 if value:
416 return value[0].encode("utf-16-le") + "\x00\x00"
417 else:
418 return ""
419 texts = map(render_text, _standard_attribute_names)
420 data = struct.pack("<HHHHH", *map(len, texts)) + "".join(texts)
421 return self.GUID + struct.pack("<Q", 24 + len(data)) + data
424 class ExtendedContentDescriptionObject(BaseObject):
425 """Extended content description."""
426 GUID = "\x40\xA4\xD0\xD2\x07\xE3\xD2\x11\x97\xF0\x00\xA0\xC9\x5E\xA8\x50"
428 def parse(self, asf, data, fileobj, size):
429 super(ExtendedContentDescriptionObject, self).parse(asf, data, fileobj, size)
430 asf.extended_content_description_obj = self
431 num_attributes, = struct.unpack("<H", data[0:2])
432 pos = 2
433 for i in range(num_attributes):
434 name_length, = struct.unpack("<H", data[pos:pos+2])
435 pos += 2
436 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
437 pos += name_length
438 value_type, value_length = struct.unpack("<HH", data[pos:pos+4])
439 pos += 4
440 value = data[pos:pos+value_length]
441 pos += value_length
442 attr = _attribute_types[value_type](data=value)
443 asf.tags.append((name, attr))
445 def render(self, asf):
446 attrs = asf.to_extended_content_description.items()
447 data = "".join([attr.render(name) for (name, attr) in attrs])
448 data = struct.pack("<QH", 26 + len(data), len(attrs)) + data
449 return self.GUID + data
452 class FilePropertiesObject(BaseObject):
453 """File properties."""
454 GUID = "\xA1\xDC\xAB\x8C\x47\xA9\xCF\x11\x8E\xE4\x00\xC0\x0C\x20\x53\x65"
456 def parse(self, asf, data, fileobj, size):
457 super(FilePropertiesObject, self).parse(asf, data, fileobj, size)
458 length, _, preroll = struct.unpack("<QQQ", data[40:64])
459 asf.info.length = length / 10000000.0 - preroll / 1000.0
462 class StreamPropertiesObject(BaseObject):
463 """Stream properties."""
464 GUID = "\x91\x07\xDC\xB7\xB7\xA9\xCF\x11\x8E\xE6\x00\xC0\x0C\x20\x53\x65"
466 def parse(self, asf, data, fileobj, size):
467 super(StreamPropertiesObject, self).parse(asf, data, fileobj, size)
468 channels, sample_rate, bitrate = struct.unpack("<HII", data[56:66])
469 asf.info.channels = channels
470 asf.info.sample_rate = sample_rate
471 asf.info.bitrate = bitrate * 8
474 class HeaderExtensionObject(BaseObject):
475 """Header extension."""
476 GUID = "\xb5\x03\xbf_.\xa9\xcf\x11\x8e\xe3\x00\xc0\x0c Se"
478 def parse(self, asf, data, fileobj, size):
479 super(HeaderExtensionObject, self).parse(asf, data, fileobj, size)
480 asf.header_extension_obj = self
481 datasize, = struct.unpack("<I", data[18:22])
482 datapos = 0
483 self.objects = []
484 while datapos < datasize:
485 guid, size = struct.unpack("<16sQ", data[22+datapos:22+datapos+24])
486 if guid in _object_types:
487 obj = _object_types[guid]()
488 else:
489 obj = UnknownObject(guid)
490 obj.parse(asf, data[22+datapos+24:22+datapos+size], fileobj, size)
491 self.objects.append(obj)
492 datapos += size
494 def render(self, asf):
495 data = "".join([obj.render(asf) for obj in self.objects])
496 return (self.GUID + struct.pack("<Q", 24 + 16 + 6 + len(data)) +
497 "\x11\xD2\xD3\xAB\xBA\xA9\xcf\x11" +
498 "\x8E\xE6\x00\xC0\x0C\x20\x53\x65" +
499 "\x06\x00" + struct.pack("<I", len(data)) + data)
502 class MetadataObject(BaseObject):
503 """Metadata description."""
504 GUID = "\xea\xcb\xf8\xc5\xaf[wH\x84g\xaa\x8cD\xfaL\xca"
506 def parse(self, asf, data, fileobj, size):
507 super(MetadataObject, self).parse(asf, data, fileobj, size)
508 asf.metadata_obj = self
509 num_attributes, = struct.unpack("<H", data[0:2])
510 pos = 2
511 for i in range(num_attributes):
512 (reserved, stream, name_length, value_type,
513 value_length) = struct.unpack("<HHHHI", data[pos:pos+12])
514 pos += 12
515 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
516 pos += name_length
517 value = data[pos:pos+value_length]
518 pos += value_length
519 args = {'data': value, 'stream': stream}
520 if value_type == 2:
521 args['dword'] = False
522 attr = _attribute_types[value_type](**args)
523 asf.tags.append((name, attr))
525 def render(self, asf):
526 attrs = asf.to_metadata.items()
527 data = "".join([attr.render_m(name) for (name, attr) in attrs])
528 return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
529 data)
532 class MetadataLibraryObject(BaseObject):
533 """Metadata library description."""
534 GUID = "\x94\x1c#D\x98\x94\xd1I\xa1A\x1d\x13NEpT"
536 def parse(self, asf, data, fileobj, size):
537 super(MetadataLibraryObject, self).parse(asf, data, fileobj, size)
538 asf.metadata_library_obj = self
539 num_attributes, = struct.unpack("<H", data[0:2])
540 pos = 2
541 for i in range(num_attributes):
542 (language, stream, name_length, value_type,
543 value_length) = struct.unpack("<HHHHI", data[pos:pos+12])
544 pos += 12
545 name = data[pos:pos+name_length].decode("utf-16-le").strip("\x00")
546 pos += name_length
547 value = data[pos:pos+value_length]
548 pos += value_length
549 args = {'data': value, 'language': language, 'stream': stream}
550 if value_type == 2:
551 args['dword'] = False
552 attr = _attribute_types[value_type](**args)
553 asf.tags.append((name, attr))
555 def render(self, asf):
556 attrs = asf.to_metadata_library
557 data = "".join([attr.render_ml(name) for (name, attr) in attrs])
558 return (self.GUID + struct.pack("<QH", 26 + len(data), len(attrs)) +
559 data)
562 _object_types = {
563 ExtendedContentDescriptionObject.GUID: ExtendedContentDescriptionObject,
564 ContentDescriptionObject.GUID: ContentDescriptionObject,
565 FilePropertiesObject.GUID: FilePropertiesObject,
566 StreamPropertiesObject.GUID: StreamPropertiesObject,
567 HeaderExtensionObject.GUID: HeaderExtensionObject,
568 MetadataLibraryObject.GUID: MetadataLibraryObject,
569 MetadataObject.GUID: MetadataObject,
573 class ASF(FileType):
574 """An ASF file, probably containing WMA or WMV."""
576 _mimes = ["audio/x-ms-wma", "audio/x-ms-wmv", "video/x-ms-asf",
577 "audio/x-wma", "video/x-wmv"]
579 def load(self, filename):
580 self.filename = filename
581 fileobj = open(filename, "rb")
582 try:
583 self.size = 0
584 self.size1 = 0
585 self.size2 = 0
586 self.offset1 = 0
587 self.offset2 = 0
588 self.num_objects = 0
589 self.info = ASFInfo()
590 self.tags = ASFTags()
591 self.__read_file(fileobj)
592 finally:
593 fileobj.close()
595 def save(self):
596 # Move attributes to the right objects
597 self.to_extended_content_description = {}
598 self.to_metadata = {}
599 self.to_metadata_library = []
600 for name, value in self.tags:
601 if name in _standard_attribute_names:
602 continue
603 large_value = value.data_size() > 0xFFFF
604 if (value.language is None and value.stream is None and
605 name not in self.to_extended_content_description and
606 not large_value):
607 self.to_extended_content_description[name] = value
608 elif (value.language is None and value.stream is not None and
609 name not in self.to_metadata and not large_value):
610 self.to_metadata[name] = value
611 else:
612 self.to_metadata_library.append((name, value))
614 # Add missing objects
615 if not self.content_description_obj:
616 self.content_description_obj = \
617 ContentDescriptionObject()
618 self.objects.append(self.content_description_obj)
619 if not self.extended_content_description_obj:
620 self.extended_content_description_obj = \
621 ExtendedContentDescriptionObject()
622 self.objects.append(self.extended_content_description_obj)
623 if not self.header_extension_obj:
624 self.header_extension_obj = \
625 HeaderExtensionObject()
626 self.objects.append(self.header_extension_obj)
627 if not self.metadata_obj:
628 self.metadata_obj = \
629 MetadataObject()
630 self.header_extension_obj.objects.append(self.metadata_obj)
631 if not self.metadata_library_obj:
632 self.metadata_library_obj = \
633 MetadataLibraryObject()
634 self.header_extension_obj.objects.append(self.metadata_library_obj)
636 # Render the header
637 data = "".join([obj.render(self) for obj in self.objects])
638 data = (HeaderObject.GUID +
639 struct.pack("<QL", len(data) + 30, len(self.objects)) +
640 "\x01\x02" + data)
642 fileobj = open(self.filename, "rb+")
643 try:
644 size = len(data)
645 if size > self.size:
646 insert_bytes(fileobj, size - self.size, self.size)
647 if size < self.size:
648 delete_bytes(fileobj, self.size - size, 0)
649 fileobj.seek(0)
650 fileobj.write(data)
651 finally:
652 fileobj.close()
654 def __read_file(self, fileobj):
655 header = fileobj.read(30)
656 if len(header) != 30 or header[:16] != HeaderObject.GUID:
657 raise ASFHeaderError, "Not an ASF file."
659 self.extended_content_description_obj = None
660 self.content_description_obj = None
661 self.header_extension_obj = None
662 self.metadata_obj = None
663 self.metadata_library_obj = None
665 self.size, self.num_objects = struct.unpack("<QL", header[16:28])
666 self.objects = []
667 for i in range(self.num_objects):
668 self.__read_object(fileobj)
670 def __read_object(self, fileobj):
671 guid, size = struct.unpack("<16sQ", fileobj.read(24))
672 if guid in _object_types:
673 obj = _object_types[guid]()
674 else:
675 obj = UnknownObject(guid)
676 data = fileobj.read(size - 24)
677 obj.parse(self, data, fileobj, size)
678 self.objects.append(obj)
680 def score(filename, fileobj, header):
681 return header.startswith(HeaderObject.GUID) * 2
682 score = staticmethod(score)
684 Open = ASF