1 # Copyright 2006-2007 Lukas Lalinsky
2 # Copyright 2005-2006 Joe Wreschnig
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License version 2 as
6 # published by the Free Software Foundation.
8 # $Id: asf.py 4224 2007-12-03 09:01:49Z luks $
10 """Read and write ASF (Window Media Audio) files."""
12 __all__
= ["ASF", "Open"]
15 from mutagen
import FileType
, Metadata
16 from mutagen
._util
import insert_bytes
, delete_bytes
, DictMixin
18 class error(IOError): pass
19 class ASFError(error
): pass
20 class ASFHeaderError(error
): pass
23 class ASFInfo(object):
24 """ASF stream information."""
33 s
= "Windows Media Audio %d bps, %s Hz, %d channels, %.2f seconds" % (
34 self
.bitrate
, self
.sample_rate
, self
.channels
, self
.length
)
38 class ASFTags(list, DictMixin
, Metadata
):
39 """Dictionary containing ASF attributes."""
42 return "\n".join(["%s=%s" % (k
, v
) for k
, v
in self
])
44 def __getitem__(self
, key
):
45 """A list of values for the key.
47 This is a copy, so comment['title'].append('a title') will not
51 values
= [value
for (k
, value
) in self
if k
== key
]
52 if not values
: raise KeyError, key
55 def __delitem__(self
, key
):
56 """Delete all values associated with the key."""
57 to_delete
= filter(lambda x
: x
[0] == key
, self
)
58 if not to_delete
: raise KeyError, key
59 else: map(self
.remove
, to_delete
)
61 def __contains__(self
, key
):
62 """Return true if the key has any values."""
64 if k
== key
: return True
67 def __setitem__(self
, key
, values
):
68 """Set a key's value or values.
70 Setting a value overwrites all old ones. The value may be a
71 list of Unicode or UTF-8 strings, or a single Unicode or UTF-8
75 if not isinstance(values
, list):
80 if key
in _standard_attribute_names
:
81 value
= unicode(value
)
82 elif not isinstance(value
, ASFBaseAttribute
):
83 if isinstance(value
, basestring
):
84 value
= ASFUnicodeAttribute(value
)
85 elif isinstance(value
, bool):
86 value
= ASFBoolAttribute(value
)
87 elif isinstance(value
, int):
88 value
= ASFDWordAttribute(value
)
89 elif isinstance(value
, long):
90 value
= ASFQWordAttribute(value
)
91 self
.append((key
, value
))
94 """Return all keys in the comment."""
95 return self
and set(zip(*self
)[0])
98 """Return a copy of the comment data in a real dict."""
100 for key
, value
in self
:
101 d
.setdefault(key
, []).append(value
)
105 class ASFBaseAttribute(object):
106 """Generic attribute."""
109 def __init__(self
, value
=None, data
=None, language
=None,
110 stream
=None, **kwargs
):
111 self
.language
= language
114 self
.value
= self
.parse(data
, **kwargs
)
119 name
= "%s(%r" % (type(self
).__name
__, self
.value
)
121 name
+= ", language=%d" % self
.language
123 name
+= ", stream=%d" % self
.stream
127 def render(self
, name
):
128 name
= name
.encode("utf-16-le") + "\x00\x00"
129 data
= self
._render
()
130 return (struct
.pack("<H", len(name
)) + name
+
131 struct
.pack("<HH", self
.TYPE
, len(data
)) + data
)
133 def render_m(self
, name
):
134 name
= name
.encode("utf-16-le") + "\x00\x00"
136 data
= self
._render
(dword
=False)
138 data
= self
._render
()
139 return (struct
.pack("<HHHHI", 0, self
.stream
or 0, len(name
),
140 self
.TYPE
, len(data
)) + name
+ data
)
142 def render_ml(self
, name
):
143 name
= name
.encode("utf-16-le") + "\x00\x00"
145 data
= self
._render
(dword
=False)
147 data
= self
._render
()
148 return (struct
.pack("<HHHHI", self
.language
or 0, self
.stream
or 0,
149 len(name
), self
.TYPE
, len(data
)) + name
+ data
)
151 class ASFUnicodeAttribute(ASFBaseAttribute
):
152 """Unicode string attribute."""
155 def parse(self
, data
):
156 return data
.decode("utf-16-le").strip("\x00")
159 return self
.value
.encode("utf-16-le") + "\x00\x00"
164 def __cmp__(self
, other
):
165 return cmp(unicode(self
), other
)
168 class ASFByteArrayAttribute(ASFBaseAttribute
):
169 """Byte array attribute."""
172 def parse(self
, data
):
179 return "[binary data (%s bytes)]" % len(self
.value
)
181 def __cmp__(self
, other
):
182 return cmp(str(self
), other
)
185 class ASFBoolAttribute(ASFBaseAttribute
):
186 """Bool attribute."""
189 def parse(self
, data
, dword
=True):
191 return struct
.unpack("<I", data
)[0] == 1
193 return struct
.unpack("<H", data
)[0] == 1
195 def _render(self
, dword
=True):
197 return struct
.pack("<I", int(self
.value
))
199 return struct
.pack("<H", int(self
.value
))
205 return str(self
.value
)
207 def __cmp__(self
, other
):
208 return cmp(bool(self
), other
)
211 class ASFDWordAttribute(ASFBaseAttribute
):
212 """DWORD attribute."""
215 def parse(self
, data
):
216 return struct
.unpack("<L", data
)[0]
219 return struct
.pack("<L", self
.value
)
225 return str(self
.value
)
227 def __cmp__(self
, other
):
228 return cmp(int(self
), other
)
231 class ASFQWordAttribute(ASFBaseAttribute
):
232 """QWORD attribute."""
235 def parse(self
, data
):
236 return struct
.unpack("<Q", data
)[0]
239 return struct
.pack("<Q", self
.value
)
245 return str(self
.value
)
247 def __cmp__(self
, other
):
248 return cmp(int(self
), other
)
251 class ASFWordAttribute(ASFBaseAttribute
):
252 """WORD attribute."""
255 def parse(self
, data
):
256 return struct
.unpack("<H", data
)[0]
259 return struct
.pack("<H", self
.value
)
265 return str(self
.value
)
267 def __cmp__(self
, other
):
268 return cmp(int(self
), other
)
271 class ASFGUIDAttribute(ASFBaseAttribute
):
272 """GUID attribute."""
275 def parse(self
, data
):
284 def __cmp__(self
, other
):
285 return cmp(str(self
), other
)
288 UNICODE
= ASFUnicodeAttribute
.TYPE
289 BYTEARRAY
= ASFByteArrayAttribute
.TYPE
290 BOOL
= ASFBoolAttribute
.TYPE
291 DWORD
= ASFDWordAttribute
.TYPE
292 QWORD
= ASFQWordAttribute
.TYPE
293 WORD
= ASFWordAttribute
.TYPE
294 GUID
= ASFGUIDAttribute
.TYPE
296 def ASFValue(value
, kind
, **kwargs
):
297 for t
, c
in _attribute_types
.items():
299 return c(value
=value
, **kwargs
)
300 raise ValueError("Unknown value type")
304 ASFUnicodeAttribute
.TYPE
: ASFUnicodeAttribute
,
305 ASFByteArrayAttribute
.TYPE
: ASFByteArrayAttribute
,
306 ASFBoolAttribute
.TYPE
: ASFBoolAttribute
,
307 ASFDWordAttribute
.TYPE
: ASFDWordAttribute
,
308 ASFQWordAttribute
.TYPE
: ASFQWordAttribute
,
309 ASFWordAttribute
.TYPE
: ASFWordAttribute
,
310 ASFGUIDAttribute
.TYPE
: ASFGUIDAttribute
,
314 _standard_attribute_names
= [
323 class BaseObject(object):
324 """Base ASF object."""
327 def parse(self
, asf
, data
, fileobj
, size
):
330 def render(self
, asf
):
331 data
= self
.GUID
+ struct
.pack("<Q", len(self
.data
) + 24) + self
.data
336 class UnknownObject(BaseObject
):
337 """Unknown ASF object."""
338 def __init__(self
, guid
):
342 class HeaderObject(object):
344 GUID
= "\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
347 class ContentDescriptionObject(BaseObject
):
348 """Content description."""
349 GUID
= "\x33\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
351 def parse(self
, asf
, data
, fileobj
, size
):
352 super(ContentDescriptionObject
, self
).parse(asf
, data
, fileobj
, size
)
353 asf
.content_description_obj
= self
354 lengths
= struct
.unpack("<HHHHH", data
[:10])
357 for length
in lengths
:
359 texts
.append(data
[pos
:end
].decode("utf-16-le").strip("\x00"))
361 (asf
.tags
["Title"], asf
.tags
["Author"], asf
.tags
["Copyright"],
362 asf
.tags
["Description"], asf
.tags
["Rating"]) = texts
364 def render(self
, asf
):
365 def render_text(name
):
366 value
= asf
.tags
.get(name
, [])
367 if value
and value
[0]:
368 return value
[0].encode("utf-16-le") + "\x00\x00"
371 texts
= map(render_text
, _standard_attribute_names
)
372 data
= struct
.pack("<HHHHH", *map(str.__len
__, texts
)) + "".join(texts
)
373 return self
.GUID
+ struct
.pack("<Q", 24 + len(data
)) + data
376 class ExtendedContentDescriptionObject(BaseObject
):
377 """Extended content description."""
378 GUID
= "\x40\xA4\xD0\xD2\x07\xE3\xD2\x11\x97\xF0\x00\xA0\xC9\x5E\xA8\x50"
380 def parse(self
, asf
, data
, fileobj
, size
):
381 super(ExtendedContentDescriptionObject
, self
).parse(asf
, data
, fileobj
, size
)
382 asf
.extended_content_description_obj
= self
383 num_attributes
, = struct
.unpack("<H", data
[0:2])
385 for i
in range(num_attributes
):
386 name_length
, = struct
.unpack("<H", data
[pos
:pos
+2])
388 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
390 value_type
, value_length
= struct
.unpack("<HH", data
[pos
:pos
+4])
392 value
= data
[pos
:pos
+value_length
]
394 attr
= _attribute_types
[value_type
](data
=value
)
395 asf
.tags
.append((name
, attr
))
397 def render(self
, asf
):
398 attrs
= asf
.to_extended_content_description
.items()
399 data
= "".join([attr
.render(name
) for (name
, attr
) in attrs
])
400 data
= struct
.pack("<QH", 26 + len(data
), len(attrs
)) + data
401 return self
.GUID
+ data
404 class FilePropertiesObject(BaseObject
):
405 """File properties."""
406 GUID
= "\xA1\xDC\xAB\x8C\x47\xA9\xCF\x11\x8E\xE4\x00\xC0\x0C\x20\x53\x65"
408 def parse(self
, asf
, data
, fileobj
, size
):
409 super(FilePropertiesObject
, self
).parse(asf
, data
, fileobj
, size
)
410 length
, _
, preroll
= struct
.unpack("<QQQ", data
[40:64])
411 asf
.info
.length
= length
/ 10000000.0 - preroll
/ 1000.0
414 class StreamPropertiesObject(BaseObject
):
415 """Stream properties."""
416 GUID
= "\x91\x07\xDC\xB7\xB7\xA9\xCF\x11\x8E\xE6\x00\xC0\x0C\x20\x53\x65"
418 def parse(self
, asf
, data
, fileobj
, size
):
419 super(StreamPropertiesObject
, self
).parse(asf
, data
, fileobj
, size
)
420 channels
, sample_rate
, bitrate
= struct
.unpack("<HII", data
[56:66])
421 asf
.info
.channels
= channels
422 asf
.info
.sample_rate
= sample_rate
423 asf
.info
.bitrate
= bitrate
* 8
426 class HeaderExtensionObject(BaseObject
):
427 """Header extension."""
428 GUID
= "\xb5\x03\xbf_.\xa9\xcf\x11\x8e\xe3\x00\xc0\x0c Se"
430 def parse(self
, asf
, data
, fileobj
, size
):
431 super(HeaderExtensionObject
, self
).parse(asf
, data
, fileobj
, size
)
432 asf
.header_extension_obj
= self
433 datasize
, = struct
.unpack("<I", data
[18:22])
436 while datapos
< datasize
:
437 guid
, size
= struct
.unpack("<16sQ", data
[22+datapos
:22+datapos
+24])
438 if guid
in _object_types
:
439 obj
= _object_types
[guid
]()
441 obj
= UnknownObject(guid
)
442 obj
.parse(asf
, data
[22+datapos
+24:22+datapos
+size
], fileobj
, size
)
443 self
.objects
.append(obj
)
446 def render(self
, asf
):
447 data
= "".join([obj
.render(asf
) for obj
in self
.objects
])
448 return (self
.GUID
+ struct
.pack("<Q", 24 + 16 + 6 + len(data
)) +
449 "\x11\xD2\xD3\xAB\xBA\xA9\xcf\x11" +
450 "\x8E\xE6\x00\xC0\x0C\x20\x53\x65" +
451 "\x06\x00" + struct
.pack("<I", len(data
)) + data
)
454 class MetadataObject(BaseObject
):
455 """Metadata description."""
456 GUID
= "\xea\xcb\xf8\xc5\xaf[wH\x84g\xaa\x8cD\xfaL\xca"
458 def parse(self
, asf
, data
, fileobj
, size
):
459 super(MetadataObject
, self
).parse(asf
, data
, fileobj
, size
)
460 asf
.metadata_obj
= self
461 num_attributes
, = struct
.unpack("<H", data
[0:2])
463 for i
in range(num_attributes
):
464 (reserved
, stream
, name_length
, value_type
,
465 value_length
) = struct
.unpack("<HHHHI", data
[pos
:pos
+12])
467 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
469 value
= data
[pos
:pos
+value_length
]
471 args
= {'data': value
, 'stream': stream
}
473 args
['dword'] = False
474 attr
= _attribute_types
[value_type
](**args
)
475 asf
.tags
.append((name
, attr
))
477 def render(self
, asf
):
478 attrs
= asf
.to_metadata
.items()
479 data
= "".join([attr
.render_m(name
) for (name
, attr
) in attrs
])
480 return (self
.GUID
+ struct
.pack("<QH", 26 + len(data
), len(attrs
)) +
484 class MetadataLibraryObject(BaseObject
):
485 """Metadata library description."""
486 GUID
= "\x94\x1c#D\x98\x94\xd1I\xa1A\x1d\x13NEpT"
488 def parse(self
, asf
, data
, fileobj
, size
):
489 super(MetadataLibraryObject
, self
).parse(asf
, data
, fileobj
, size
)
490 asf
.metadata_library_obj
= self
491 num_attributes
, = struct
.unpack("<H", data
[0:2])
493 for i
in range(num_attributes
):
494 (language
, stream
, name_length
, value_type
,
495 value_length
) = struct
.unpack("<HHHHI", data
[pos
:pos
+12])
497 name
= data
[pos
:pos
+name_length
].decode("utf-16-le").strip("\x00")
499 value
= data
[pos
:pos
+value_length
]
501 args
= {'data': value
, 'language': language
, 'stream': stream
}
503 args
['dword'] = False
504 attr
= _attribute_types
[value_type
](**args
)
505 asf
.tags
.append((name
, attr
))
507 def render(self
, asf
):
508 attrs
= asf
.to_metadata_library
509 data
= "".join([attr
.render_ml(name
) for (name
, attr
) in attrs
])
510 return (self
.GUID
+ struct
.pack("<QH", 26 + len(data
), len(attrs
)) +
515 ExtendedContentDescriptionObject
.GUID
: ExtendedContentDescriptionObject
,
516 ContentDescriptionObject
.GUID
: ContentDescriptionObject
,
517 FilePropertiesObject
.GUID
: FilePropertiesObject
,
518 StreamPropertiesObject
.GUID
: StreamPropertiesObject
,
519 HeaderExtensionObject
.GUID
: HeaderExtensionObject
,
520 MetadataLibraryObject
.GUID
: MetadataLibraryObject
,
521 MetadataObject
.GUID
: MetadataObject
,
526 """An ASF file, probably containing WMA or WMV."""
528 _mimes
= ["audio/x-ms-wma", "audio/x-ms-wmv", "video/x-ms-asf",
529 "audio/x-wma", "video/x-wmv"]
531 def load(self
, filename
):
532 self
.filename
= filename
533 fileobj
= file(filename
, "rb")
541 self
.info
= ASFInfo()
542 self
.tags
= ASFTags()
543 self
.__read
_file
(fileobj
)
548 # Move attributes to the right objects
549 self
.to_extended_content_description
= {}
550 self
.to_metadata
= {}
551 self
.to_metadata_library
= []
552 for name
, value
in self
.tags
:
553 if name
in _standard_attribute_names
:
555 if (value
.language
is None and value
.stream
is None and
556 name
not in self
.to_extended_content_description
):
557 self
.to_extended_content_description
[name
] = value
558 elif (value
.language
is None and value
.stream
is not None and
559 name
not in self
.to_metadata
):
560 self
.to_metadata
[name
] = value
562 self
.to_metadata_library
.append((name
, value
))
564 # Add missing objects
565 if not self
.content_description_obj
:
566 self
.content_description_obj
= \
567 ContentDescriptionObject()
568 self
.objects
.append(self
.content_description_obj
)
569 if not self
.extended_content_description_obj
:
570 self
.extended_content_description_obj
= \
571 ExtendedContentDescriptionObject()
572 self
.objects
.append(self
.extended_content_description_obj
)
573 if not self
.header_extension_obj
:
574 self
.header_extension_obj
= \
575 HeaderExtensionObject()
576 self
.objects
.append(self
.header_extension_obj
)
577 if not self
.metadata_obj
:
578 self
.metadata_obj
= \
580 self
.header_extension_obj
.objects
.append(self
.metadata_obj
)
581 if not self
.metadata_library_obj
:
582 self
.metadata_library_obj
= \
583 MetadataLibraryObject()
584 self
.header_extension_obj
.objects
.append(self
.metadata_library_obj
)
587 data
= "".join([obj
.render(self
) for obj
in self
.objects
])
588 data
= (HeaderObject
.GUID
+
589 struct
.pack("<QL", len(data
) + 30, len(self
.objects
)) +
592 fileobj
= file(self
.filename
, "rb+")
596 insert_bytes(fileobj
, size
- self
.size
, self
.size
)
598 delete_bytes(fileobj
, self
.size
- size
, 0)
604 def __read_file(self
, fileobj
):
605 header
= fileobj
.read(30)
606 if len(header
) != 30 or header
[:16] != HeaderObject
.GUID
:
607 raise ASFHeaderError
, "Not an ASF file."
609 self
.extended_content_description_obj
= None
610 self
.content_description_obj
= None
611 self
.header_extension_obj
= None
612 self
.metadata_obj
= None
613 self
.metadata_library_obj
= None
615 self
.size
, self
.num_objects
= struct
.unpack("<QL", header
[16:28])
617 for i
in range(self
.num_objects
):
618 self
.__read
_object
(fileobj
)
620 def __read_object(self
, fileobj
):
621 guid
, size
= struct
.unpack("<16sQ", fileobj
.read(24))
622 if guid
in _object_types
:
623 obj
= _object_types
[guid
]()
625 obj
= UnknownObject(guid
)
626 data
= fileobj
.read(size
- 24)
627 obj
.parse(self
, data
, fileobj
, size
)
628 self
.objects
.append(obj
)
630 def score(filename
, fileobj
, header
):
631 return header
.startswith(HeaderObject
.GUID
) * 2
632 score
= staticmethod(score
)