Field is a composite object with a factory
[riffle.git] / shuffle.py
blob0566a5be75782f7570f3959e7d236c57fa0af000
1 """
2 iPod Shuffle database access
4 Documentation:
5 - http://ipodlinux.org/ITunesDB#iTunesSD_file and further
7 Author: Artem Baguinski
8 """
10 import struct, os, sys
12 BIG_ENDIAN = True
13 LITTLE_ENDIAN = False
14 READ = 'rb'
15 WRITE = 'w+b'
17 def pack_uint24(i,bigendian):
18 "Pack an unsigned integer to a string of three bytes"
19 if bigendian:
20 return struct.pack(">I",i)[1:4]
21 else:
22 return struct.pack("<I",i)[0:3]
24 def unpack_uint24(s,bigendian):
25 "Unpack an unsigned integer from first three bytes of a string"
26 if bigendian:
27 return struct.unpack('>I','\x00' + s[0:3])[0]
28 else:
29 return struct.unpack('<I',s[0:3] + '\x00')[0]
31 def pack_int24(i,bigendian):
32 "Pack a signed integer to a string of three bytes"
33 if bigendian:
34 return struct.pack(">i",i)[1:4]
35 else:
36 return struct.pack("<i",i)[0:3]
38 def unpack_int24(s,bigendian):
39 "Unpack a signed integer from first three bytes of a string"
40 u = unpack_uint24(s,bigendian)
41 if (u & 0x800) != 0:
42 return - ((~u + 1) & 0xfff)
43 else:
44 return u
46 class Skip(object):
47 def __init__(self, n):
48 self.size = n
49 def skip(self, file):
50 file.seek(self.size, os.SEEK_CUR)
51 def read(self, file, dict):
52 self.skip(file)
53 def write(self, file, dict):
54 self.skip(file)
55 def set_bigendian(self, ignore):
56 pass
58 class Field:
59 class Named:
60 def __init__(self, name):
61 self.name = name
62 def get(self, dict):
63 return dict[self.name]
64 def put(self, dict, value):
65 dict[self.name] = value
67 class Const:
68 def __init__(self, const, check):
69 self.const = const
70 self.check = check
71 def get(self, dict):
72 return self.const
73 def put(self, dict, value):
74 if self.check and value != self.const:
75 raise "Format error"
77 def __init__(self, packer, name=None, const=None, check = False):
78 self.packer = packer
79 if name is not None and const is None:
80 self.value_handler = Field.Named(name)
81 elif name is None and const is not None:
82 self.value_handler = Field.Const(const, check)
84 def read(self, file, dict):
85 self.put(dict, self.unpack( file.read( self.packer.size )))
86 def write(self, file, dict):
87 file.write( self.pack( self.value_handler.get(dict) ))
88 def put(self, dict, val):
89 self.value_handler.put(dict, val)
90 def get(self, dict):
91 return self.value_handler.get(dict)
92 def unpack(self, str):
93 return self.packer.unpack(str)
94 def pack(self, val):
95 return self.packer.pack(val)
96 def set_bigendian(self, bigendian):
97 self.packer.bigendian = bigendian
99 class SimpleField:
100 def __init__(self, fmt):
101 self.fmt = fmt
102 self.size = struct.calcsize(fmt)
103 def pack(self,val):
104 return struct.pack(self.fmt,val)
105 def unpack(self,str):
106 return struct.unpack(self.fmt,str)[0]
108 class Uint8(SimpleField):
109 def __init__(self): SimpleField.__init__(self,"B")
111 class Bool8(Uint8):
112 def __init__(self): Uint8.__init__(self)
113 def pack(self, val): return Uint8.pack(self, (1 if val else 0))
114 def unpack(self, str): return Uint8.unpack(self, str) != 0
116 class Uint24:
117 def __init__(self, bigendian = LITTLE_ENDIAN):
118 self.size = 3
119 self.bigendian = bigendian
120 def pack(self, val):
121 return pack_uint24(val, self.bigendian)
122 def unpack(self, str):
123 return unpack_uint24(str, self.bigendian)
125 class Int24:
126 def __init__(self, bigendian = LITTLE_ENDIAN):
127 self.size = 3
128 self.bigendian = bigendian
129 def pack(self, val):
130 return pack_int24(val, self.bigendian)
131 def unpack(self, str):
132 return unpack_int24(str, self.bigendian)
134 class Bool24(Int24):
135 def __init__(self): Int24.__init__(self)
136 def pack(self, val): return Int24.pack(self, (-1 if val else 0))
137 def unpack(self, str): return Int24.unpack(self, str) != 0
139 class ZeroPaddedString:
140 def __init__(self, len, enc):
141 self.size = len
142 self.enc = enc
143 def pack(self, val):
144 return val.encode(self.enc).ljust(self.size,'\x00')
145 def unpack(self, str):
146 return str.decode(self.enc).rstrip('\x00')
148 class Record:
149 def __init__(self, fields, bigendian):
150 for f in fields:
151 f.set_bigendian(bigendian)
152 self.fields = fields
154 def read(self, file, dict):
155 for f in self.fields:
156 f.read(file, dict)
158 def write(self, file, dict):
159 for f in self.fields:
160 f.write(file, dict)
162 class Track:
163 supported_file_types = (".mp3", ".aa", ".m4a", ".m4b", ".m4p", ".wav")
165 starttime = 0
166 stoptime = 0
167 volume = 0x64
168 bookmarktime = -1
169 playcount = 0
170 skippedcount = 0
171 filename = None
172 file_type = 0
173 bookmarkflag = False
174 shuffleflag = True
176 def set_filename(self, filename):
177 self.filename = filename
178 if filename.endswith((".mp3",".aa")):
179 self.file_type = 1
180 elif filename.endswith((".m4a", ".m4b", ".m4p")):
181 self.file_type = 2
182 elif filename.endswith(".wav"):
183 self.file_type = 4
184 else:
185 raise "%s: unsupported file type" % (filename)
186 if filename.endswith((".aa",".m4b")):
187 self.bookmarkflag = True
188 else:
189 self.bookmarkflag = False
190 self.shuffleflag = not self.bookmarkflag
192 def __str__(self):
193 s = "%s\n vol: %d " % (self.filename, self.volume)
194 if self.starttime != 0 or self.stoptime != 0:
195 s += "%5.3fs-%5.3fs " % (self.starttime*0.256, self.stoptime*0.256)
196 if self.bookmarkflag:
197 bm = self.bookmarktime
198 if bm<0:
199 bm=0
200 s += "bookmark: %5.3fs " % (bm*0.256)
201 if self.shuffleflag:
202 s += "shuffle "
203 s += "played: %d skipped: %d" % (self.playcount, self.skippedcount)
204 return s
206 # persistency
207 old_tracks = {}
209 @classmethod
210 def new(cls, filename):
211 if Track.old_tracks.has_key(filename):
212 return Track.old_tracks[filename]
213 else:
214 t = cls()
215 t.set_filename(filename)
216 return t
218 @classmethod
219 def set_old_tracks(cls, lst):
220 cls.old_tracks = {}
221 for i in xrange(len(lst)):
222 t = lst[i]
223 cls.old_tracks[t.filename] = t
224 cls.old_tracks[i] = t
226 class PState:
227 volume = 29
228 shufflepos = 0
229 trackno = 0
230 shuffleflag = False
231 trackpos = 0
233 def __str__(self):
234 return """Player state:
235 volume: %d
236 shuffle mode: %s
237 shuffle position: %d
238 track number: %d
239 track position: %d""" % (self.volume, self.shuffleflag,
240 self.shufflepos, self.trackno, self.trackpos)
242 class ShuffleDB:
243 endian = BIG_ENDIAN
244 file = None
245 mode = READ
247 def skip(self, n):
248 if self.file is not None:
249 self.file.seek(n, os.SEEK_CUR)
251 def read(self, n):
252 if self.file is None:
253 raise "No current file"
254 return self.file.read(n)
256 def write(self, buf):
257 if self.file is None:
258 raise "No current file"
259 self.file.write(buf)
261 def read_u24(self, check = None):
262 u24 = unpack_uint24( self.read(3), self.endian )
263 if check is not None and u24 != check:
264 raise "Format error"
265 return u24
267 def read_u8(self):
268 return struct.unpack("B", self.read(1))[0]
270 def write_u8(self, val):
271 self.write( struct.pack("B", val) )
273 def read_i24(self, check = None):
274 i24 = unpack_int24( self.read(3), self.endian )
275 if check is not None and i24 != check:
276 raise "Format error"
277 return i24
279 def write_u24(self, u):
280 self.write( pack_uint24(u, self.endian) )
282 def write_i24(self, u):
283 self.write( pack_int24(u, self.endian) )
286 def read_bool(self):
287 return self.read_u8() != 0
289 def write_bool(self, v):
290 if v: self.write_u8(1)
291 else: self.write_u8(0)
293 def read_bool24(self):
294 return self.read_u24() != 0
296 def write_bool24(self, v):
297 if v: self.write_u24(1)
298 else: self.write_u24(0)
300 def read_string(self, raw_len):
301 return self.read(raw_len).decode("UTF-16-le").rstrip("\x00")
303 def write_string(self, string, pad_to):
304 self.write( string.encode("UTF-16-le").ljust(pad_to, '\x00') )
307 def open_file(self, fname, endian, mode=READ):
308 if self.file is not None:
309 if self.mode == WRITE:
310 self.file.truncate()
311 self.file.close()
313 if fname is not None:
314 self.file = open(fname,mode)
316 self.endian = endian
317 self.mode = mode
319 def close_file(self):
320 self.open_file(None, BIG_ENDIAN)
322 iTunesSD_track = Record([
323 Field(Uint24(), const=558, check=True),
324 Skip(3),
325 Field(Uint24(), 'starttime'),
326 Skip(6),
327 Field(Uint24(), 'stoptime'),
328 Skip(6),
329 Field(Uint24(), 'volume'),
330 Field(Uint24(), 'file_type'),
331 Skip(3),
332 Field(ZeroPaddedString(522, 'UTF-16-LE'), 'filename'),
333 Field(Bool8(), 'shuffleflag'),
334 Field(Bool8(), 'bookmarkflag'),
335 Skip(1)],
336 BIG_ENDIAN)
338 def write_iTunesSD(self, tracks):
339 self.open_file('iTunesSD', BIG_ENDIAN, WRITE)
340 self.write_u24(len(tracks))
341 self.write_u24(0x010800) # like iTunes 7.2 does
342 self.write_u24(18) # header size
343 self.skip(9)
344 for t in tracks:
345 self.iTunesSD_track.write(self.file, t.__dict__)
346 self.close_file()
348 def read_iTunesSD(self):
349 self.open_file('iTunesSD', BIG_ENDIAN)
350 num_tracks = self.read_u24()
351 self.skip(15) # skip the rest of the header
352 tracks = []
353 for n in xrange(0, num_tracks):
354 t = Track()
355 self.iTunesSD_track.read(self.file, t.__dict__ )
356 tracks.append( t )
357 self.close_file()
358 return tracks
360 def write_iTunesStats(self, tracks):
361 self.open_file('iTunesStats', LITTLE_ENDIAN, WRITE)
362 self.write_u24( len(tracks) )
363 self.skip(3)
364 for t in tracks:
365 self.write_iTunesStats_track(t)
366 self.close_file()
368 def write_iTunesStats_track(self, t):
369 self.write_u24( 18 ) # record length
370 self.write_i24( t.bookmarktime )
371 self.skip(6)
372 self.write_u24( t.playcount )
373 self.write_u24( t.skippedcount )
375 def read_iTunesStats(self, tracks):
376 self.open_file('iTunesStats', LITTLE_ENDIAN)
377 num_tracks = self.read_u24()
378 if num_tracks != len(tracks):
379 raise "Inconsistent iTunesSD and iTunesStats"
380 self.skip(3)
381 for t in tracks:
382 self.read_iTunesStats_track(t)
383 self.close_file()
385 def read_iTunesStats_track(self, t):
386 self.read_u24( 18 ) # sanity check (record length)
387 t.bookmarktime = self.read_i24()
388 self.skip(6)
389 t.playcount = self.read_u24()
390 t.skippedcount = self.read_u24()
392 def write_iTunesPState(self, pstate):
393 mode = WRITE
394 if os.path.exists('iTunesPState'):
395 mode = 'r+b'
396 self.open_file('iTunesPState', LITTLE_ENDIAN, mode)
397 self.write_u8( pstate.volume )
398 self.write_u24( pstate.shufflepos )
399 self.write_u24( pstate.trackno )
400 self.write_bool24( pstate.shuffleflag )
401 self.write_u24( pstate.trackpos )
402 self.skip(19) # haven't decipher yet
403 self.close_file()
406 def read_iTunesPState(self):
407 pstate = PState()
408 self.open_file('iTunesPState', LITTLE_ENDIAN)
409 pstate.volume = self.read_u8()
410 pstate.shufflepos = self.read_u24()
411 pstate.trackno = self.read_u24()
412 pstate.shuffleflag = self.read_bool24()
413 pstate.trackpos = self.read_u24()
414 self.skip(16)
415 self.close_file()
416 return pstate
418 def read_all(self):
419 tracks = self.read_iTunesSD()
420 self.read_iTunesStats(tracks)
421 pstate = self.read_iTunesPState()
422 return (tracks, pstate)
424 def write_all(self, tracks, pstate):
425 self.write_iTunesSD(tracks)
426 self.write_iTunesStats(tracks)
427 self.write_iTunesPState(pstate)
429 #####################################################################
430 if __name__ == '__main__':
431 # try conversions
432 b100 = pack_uint24(100,True)
433 l100 = pack_uint24(100,False)
434 print b100 == '\x00\x00d'
435 print l100 == 'd\x00\x00'
436 print unpack_uint24(b100,True) == 100
437 print unpack_uint24(l100,False) == 100
439 b100 = pack_int24(-100,True)
440 l100 = pack_int24(-100,False)
441 print b100 == '\xff\xff\x9c'
442 print l100 == '\x9c\xff\xff'
443 print unpack_int24(b100,True) == -100
444 print unpack_int24(l100,False) == -100
446 def print_list(xs):
447 for x in xs:
448 print x
450 if len(sys.argv) > 1:
451 # try reading
452 start_dir = os.getcwd()
453 os.chdir(sys.argv[1])
454 db = ShuffleDB()
455 tracks, pstate = db.read_all()
456 print_list( tracks )
457 print pstate
459 # try cache
460 Track.set_old_tracks( tracks )
461 t = Track.new( "foo.mp3" )
462 print t
463 t = Track.new( tracks[0].filename )
464 print t
466 if len(sys.argv) > 2:
467 # try writing
468 os.chdir(start_dir)
469 os.chdir(sys.argv[2])
470 db.write_all(tracks, pstate)