Thread safe SQLite interaction
[gpodder.git] / src / gpodder / dbsqlite.py
blobd7fab2ad1538a060b6d535bbd69f4ed27a4edd25
1 # -*- coding: utf-8 -*-
3 # gPodder - A media aggregator and podcast client
4 # Copyright (c) 2005-2008 Thomas Perl and the gPodder Team
6 # gPodder is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # gPodder is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # dbsqlite.py -- SQLite interface
20 # Justin Forest <justin.forest@gmail.com> 2008-06-13
22 have_sqlite = True
24 try:
25 from sqlite3 import dbapi2 as sqlite
26 except ImportError:
27 try:
28 from pysqlite2 import dbapi2 as sqlite
29 except ImportError:
30 have_sqlite = False
32 # TODO: show a message box
33 if not have_sqlite:
34 print "Please install pysqlite2 or upgrade to Python 2.5."
35 import sys
36 sys.exit()
38 from gpodder.liblogger import log
39 from email.Utils import mktime_tz
40 from email.Utils import parsedate_tz
41 from email.Utils import formatdate
42 from threading import RLock
43 import string
45 class LockingCursor(sqlite.Cursor):
46 """
47 This custom cursor implementation provides thread safety.
48 Only one thread at a time can work with cursors, as many
49 as needed. Using a custom cursor makes this transparent.
51 One possible alternative is to use a queue and a single
52 SQLite worker thread, which sounds right but can introduce
53 serious problems, e.g. when a thread puts something in
54 queue and dies, refusing to fetch the results. Example
55 implementation:
57 http://code.activestate.com/recipes/526618/
58 """
59 lock = None
61 def __init__(self, *args, **kwargs):
62 self.lock.acquire()
63 sqlite.Cursor.__init__(self, *args, **kwargs)
65 def __del__(self):
66 self.lock.release()
68 class Storage(object):
69 (STATE_NORMAL, STATE_DOWNLOADED, STATE_DELETED) = range(3)
71 def __init__(self):
72 self.settings = {}
73 self.channel_map = {}
74 self._db = None
75 LockingCursor.lock = RLock()
77 def setup(self, settings):
78 self.settings = settings
79 self.__check_schema()
81 @property
82 def db(self):
83 if self._db is None:
84 self._db = sqlite.connect(self.settings['database'], check_same_thread=False)
85 self._db.create_collation("unicode", lambda a, b: cmp(a.lower(), b.lower()))
86 log('SQLite connected', sender=self)
87 return self._db
89 def cursor(self):
90 return self.db.cursor(factory=LockingCursor)
92 def commit(self):
93 # grab a cursor to lock threads
94 cur = self.cursor()
95 self.db.commit()
97 def __check_schema(self):
98 """
99 Creates all necessary tables and indexes that don't exist.
101 log('Setting up SQLite database', sender=self)
103 cur = self.cursor()
105 cur.execute("""CREATE TABLE IF NOT EXISTS channels (
106 id INTEGER PRIMARY KEY,
107 url TEXT,
108 title TEXT,
109 override_title TEXT,
110 link TEXT,
111 description TEXT,
112 image TEXT,
113 pubDate INTEGER,
114 sync_to_devices INTEGER,
115 device_playlist_name TEXT,
116 username TEXT,
117 password TEXT,
118 last_modified TEXT,
119 etag TEXT,
120 deleted INTEGER
121 )""")
122 cur.execute("""CREATE UNIQUE INDEX IF NOT EXISTS idx_url ON channels (url)""")
123 cur.execute("""CREATE INDEX IF NOT EXISTS idx_sync_to_devices ON channels (sync_to_devices)""")
124 cur.execute("""CREATE INDEX IF NOT EXISTS idx_title ON channels (title)""")
125 cur.execute("""CREATE INDEX IF NOT EXISTS idx_deleted ON channels (deleted)""")
127 cur.execute("""
128 CREATE TABLE IF NOT EXISTS episodes (
129 id INTEGER PRIMARY KEY,
130 channel_id INTEGER,
131 url TEXT,
132 title TEXT,
133 length INTEGER,
134 mimetype TEXT,
135 guid TEXT,
136 description TEXT,
137 link TEXT,
138 pubDate INTEGER,
139 state INTEGER,
140 played INTEGER,
141 locked INTEGER
143 """)
144 cur.execute("""CREATE UNIQUE INDEX IF NOT EXISTS idx_guid ON episodes (guid)""")
145 cur.execute("""CREATE INDEX IF NOT EXISTS idx_channel_id ON episodes (channel_id)""")
146 cur.execute("""CREATE INDEX IF NOT EXISTS idx_pubDate ON episodes (pubDate)""")
147 cur.execute("""CREATE INDEX IF NOT EXISTS idx_state ON episodes (state)""")
148 cur.execute("""CREATE INDEX IF NOT EXISTS idx_played ON episodes (played)""")
149 cur.execute("""CREATE INDEX IF NOT EXISTS idx_locked ON episodes (locked)""")
151 cur.close()
153 def get_channel_stat(self, url_or_id, state=None, is_played=None, is_locked=None):
154 where, params = ((),())
156 if state is not None:
157 where += ("state = ?", )
158 params += (state, )
159 if is_played is not None:
160 where += ("played = ?", )
161 params += (is_played, )
162 if is_locked is not None:
163 where += ("locked = ?", )
164 params += (is_locked, )
165 if isinstance(url_or_id, int):
166 where += ("channel_id = ?", )
167 params += (url_or_id, )
168 else:
169 where += ("channel_id IN (SELECT id FROM channels WHERE url = ?)", )
170 params += (url_or_id, )
172 if len(where):
173 return self.__get__("SELECT COUNT(*) FROM episodes WHERE %s" % (' AND '.join(where)), params)
174 else:
175 return 0
177 def load_channels(self, factory=None, url=None):
179 Returns channel descriptions as a list of dictionaries or objects,
180 returned by the factory() function, which receives the dictionary
181 as the only argument.
184 cur = self.cursor()
185 cur.execute("""
186 SELECT
188 url,
189 title,
190 override_title,
191 link,
192 description,
193 image,
194 pubDate,
195 sync_to_devices,
196 device_playlist_name,
197 username,
198 password,
199 last_modified,
200 etag
201 FROM
202 channels
203 WHERE
204 (deleted IS NULL OR deleted = 0)
205 ORDER BY
206 title COLLATE unicode
207 """)
209 result = []
210 for row in cur.fetchall():
211 channel = {
212 'id': row[0],
213 'url': row[1],
214 'title': row[2],
215 'override_title': row[3],
216 'link': row[4],
217 'description': row[5],
218 'image': row[6],
219 'pubDate': self.__formatdate__(row[7]),
220 'sync_to_devices': row[8],
221 'device_playlist_name': row[9],
222 'username': row[10],
223 'password': row[11],
224 'last_modified': row[12],
225 'etag': row[13],
228 if url is None:
229 # Maintain url/id relation for faster updates (otherwise
230 # we'd need to issue an extra query to find the channel id).
231 self.channel_map[channel['url']] = channel['id']
233 if url is None or url == channel['url']:
234 if factory is None:
235 result.append(channel)
236 else:
237 result.append(factory(channel))
239 cur.close()
241 if url is None:
242 log('Channel list read, %d entries.', len(result), sender=self)
243 else:
244 log('Channel %s read from db', url, sender=self)
246 return result
248 def save_channel(self, c, bulk=False):
249 if c.id is None:
250 c.id = self.find_channel_id(c.url)
252 cur = self.cursor()
254 if c.id is None:
255 cur.execute("INSERT INTO channels (url, title, override_title, link, description, image, pubDate, sync_to_devices, device_playlist_name, username, password, last_modified, etag) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (c.url, c.title, c.override_title, c.link, c.description, c.image, self.__mktime__(c.pubDate), c.sync_to_devices, c.device_playlist_name, c.username, c.password, c.last_modified, c.etag, ))
256 self.channel_map[c.url] = cur.lastrowid
257 log('Added channel %s[%d]', c.url, self.channel_map[c.url], sender=self)
258 else:
259 cur.execute("UPDATE channels SET url = ?, title = ?, override_title = ?, link = ?, description = ?, image = ?, pubDate = ?, sync_to_devices = ?, device_playlist_name = ?, username = ?, password = ?, last_modified = ?, etag = ?, deleted = 0 WHERE id = ?", (c.url, c.title, c.override_title, c.link, c.description, c.image, self.__mktime__(c.pubDate), c.sync_to_devices, c.device_playlist_name, c.username, c.password, c.last_modified, c.etag, c.id, ))
261 if not bulk:
262 self.commit()
264 def delete_channel(self, channel, purge=False):
265 if channel.id is None:
266 channel.id = self.find_channel_id(channel.url)
268 cur = self.cursor()
270 if purge:
271 cur.execute("DELETE FROM channels WHERE id = ?", (channel.id, ))
272 cur.execute("DELETE FROM episodes WHERE channel_id = ?", (channel.id, ))
273 if channel.url in self.channel_map:
274 del self.channel_map[channel.url]
275 else:
276 cur.execute("UPDATE channels SET deleted = 1 WHERE id = ?", (channel.id, ))
277 cur.execute("DELETE FROM episodes WHERE channel_id = ? AND state <> ?", (channel.id, self.STATE_DELETED))
279 self.commit()
281 def __read_episodes(self, factory=None, where=None, params=None, commit=True):
282 sql = "SELECT url, title, length, mimetype, guid, description, link, pubDate, state, played, locked FROM episodes"
284 if where:
285 sql = "%s %s" % (sql, where)
287 if params is None:
288 params = ()
290 cur = self.cursor()
291 cur.execute(sql, params)
293 result = []
294 for row in cur.fetchall():
295 episode = {
296 'url': row[0],
297 'title': row[1],
298 'length': row[2],
299 'mimetype': row[3],
300 'guid': row[4],
301 'description': row[5],
302 'link': row[6],
303 'pubDate': row[7],
304 'state': row[8],
305 'is_played': row[9],
306 'is_locked': row[10],
308 if episode['state'] is None:
309 episode['state'] = self.STATE_NORMAL
310 if factory is None:
311 result.append(episode)
312 else:
313 result.append(factory(episode))
315 return result
317 def load_episodes(self, channel, factory=None, limit=1000, state=None):
318 if channel.id is None:
319 channel.id = self.find_channel_id(channel.url)
321 if state is None:
322 return self.__read_episodes(factory = factory, where = """
323 WHERE channel_id = ? AND state = ? OR id IN
324 (SELECT id FROM episodes WHERE channel_id = ?
325 ORDER BY pubDate DESC LIMIT ?)
326 ORDER BY pubDate DESC
327 """, params = (channel.id, self.STATE_DOWNLOADED, channel.id, limit, ))
328 else:
329 return self.__read_episodes(factory = factory, where = " WHERE channel_id = ? AND state = ? ORDER BY pubDate DESC LIMIT ?", params = (channel.id, state, limit, ))
331 def load_episode(self, url, factory=None):
332 list = self.__read_episodes(factory = factory, where = " WHERE url = ?", params = (url, ))
333 if len(list):
334 return list[0]
336 def save_episode(self, e, bulk=False):
337 if not e.guid:
338 log('Refusing to save an episode without guid: %s', e)
339 return
341 try:
342 cur = self.cursor()
343 channel_id = self.find_channel_id(e.channel.url)
345 if e.id is None:
346 e.id = self.__get__("SELECT id FROM episodes WHERE guid = ?", (e.guid, ))
348 if e.id is None:
349 log('Episode added: %s', e.title)
350 cur.execute("INSERT INTO episodes (channel_id, url, title, length, mimetype, guid, description, link, pubDate, state, played, locked) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (channel_id, e.url, e.title, e.length, e.mimetype, e.guid, e.description, e.link, self.__mktime__(e.pubDate), e.state, e.is_played, e.is_locked, ))
351 e.id = cur.lastrowid
352 else:
353 log('Episode updated: %s', e.title)
354 cur.execute("UPDATE episodes SET title = ?, length = ?, mimetype = ?, description = ?, link = ?, pubDate = ? WHERE id = ?", (e.title, e.length, e.mimetype, e.description, e.link, self.__mktime__(e.pubDate), e.id, ))
355 except Exception, e:
356 log('save_episode() failed: %s', e, sender=self)
358 self.commit()
360 def mark_episode(self, url, state=None, is_played=None, is_locked=None, toggle=False):
361 cur = self.cursor()
362 cur.execute("SELECT state, played, locked FROM episodes WHERE url = ?", (url, ))
364 try:
365 ( cur_state, cur_played, cur_locked ) = cur.fetchone()
366 except:
367 # This only happens when we try to mark an unknown episode,
368 # which is typical for database upgrade, so we just ignore it.
369 return
371 if toggle:
372 if is_played:
373 cur_played = not cur_played
374 if is_locked:
375 cur_locked = not cur_locked
376 else:
377 if state is not None:
378 cur_state = state
379 if is_played is not None:
380 cur_played = is_played
381 if is_locked is not None:
382 cur_locked = is_locked
384 cur.execute("UPDATE episodes SET state = ?, played = ?, locked = ? WHERE url = ?", (cur_state, cur_played, cur_locked, url, ))
385 self.commit()
387 def __get__(self, sql, params=None):
389 Returns the first cell of a query result, useful for COUNT()s.
391 cur = self.cursor()
393 if params is None:
394 cur.execute(sql)
395 else:
396 cur.execute(sql, params)
398 row = cur.fetchone()
400 if row is None:
401 return None
402 else:
403 return row[0]
405 def __mktime__(self, date):
406 if isinstance(date, float) or isinstance(date, int):
407 return date
408 if date is None or '' == date:
409 return None
410 try:
411 return mktime_tz(parsedate_tz(date))
412 except TypeError:
413 log('Could not convert "%s" to a unix timestamp.', date)
414 return None
416 def __formatdate__(self, date):
417 try:
418 return formatdate(date, localtime=1)
419 except TypeError:
420 log('Could not convert "%s" to a string date.', date)
421 return None
423 def find_channel_id(self, url):
425 Looks up the channel id in the map (which lists all undeleted
426 channels), then tries to look it up in the database, including
427 deleted channels.
429 if url in self.channel_map.keys():
430 return self.channel_map[url]
431 else:
432 return self.__get__("SELECT id FROM channels WHERE url = ?", (url, ))
434 def force_last_new(self, channel):
435 old = self.__get__("""SELECT COUNT(*) FROM episodes WHERE channel_id = ?
436 AND state IN (?, ?)""", (channel.id, self.STATE_DOWNLOADED,
437 self.STATE_DELETED))
438 log('old episodes in (%d)%s: %d', channel.id, channel.url, old)
440 if old > 0:
441 self.cursor().execute("""
442 UPDATE episodes SET played = 1 WHERE channel_id = ?
443 AND played = 0 AND pubDate < (SELECT MAX(pubDate)
444 FROM episodes WHERE channel_id = ? AND state IN (?, ?))""",
445 (channel.id, channel.id, self.STATE_DOWNLOADED,
446 self.STATE_DELETED, ))
447 else:
448 self.cursor().execute("""
449 UPDATE episodes SET played = 1 WHERE channel_id = ?
450 AND pubDate <> (SELECT MAX(pubDate) FROM episodes
451 WHERE channel_id = ?)""", (channel.id, channel.id, ))
453 self.commit()
455 db = Storage()