feed list view and feed/category event handling fixes
[straw.git] / src / lib / ItemStore.py
blob566b5e24f145c08195e0d69f22f6f97109c11eaa
1 """ ItemStore.py
3 Data store abstraction module.
4 """
5 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
6 __license__ = """
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
10 version.
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle as pickle
21 import os, sys
22 import time
23 from error import *
24 import tempfile
25 import traceback
27 try:
28 from bsddb.db import *
29 import bsddb
30 except ImportError:
31 from bsddb3.db import *
32 import bsddb3 as bsddb
34 import Event
35 import SummaryItem
36 import feeds
37 import ImageCache
38 from MainloopManager import MainloopManager
40 DATABASE_FILE_NAME = "itemstore.db"
42 class ConvertException(Exception):
43 def __init__(self, version1, version2, reason):
44 self.version1 = version1
45 self.version2 = version2
46 self.reason = reason
48 class MyDB:
49 CURRENT_VERSION = 3
51 def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600,
52 recover=0, dbflags=0):
53 self._db = None
54 self._env = None
55 recoverenv = DB_CREATE | DB_RECOVER
56 # DB_INIT_TXN automatically enables logging
57 flagsforenv = DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
59 self._env = DBEnv()
60 self._env.set_data_dir(dbhome)
61 self._env.set_lk_detect(DB_LOCK_DEFAULT) # enable auto deadlock avoidance
62 self._env.set_lg_max(2**20)
63 self._env.set_lk_max_locks (10000)
64 self._env.set_lk_max_objects (10000)
66 try:
67 self._env.open(dbhome, recoverenv | flagsforenv, mode)
68 except bsddb._db.DBRunRecoveryError, err:
69 self._env.remove(dbhome)
70 self._env.close()
71 log("%s" % err[1])
72 sys.exit("Recovery Error: See README for details on how to recover data. ")
74 flags = 0
75 if truncate:
76 flags |= DB_TRUNCATE
78 try:
79 flags |= DB_AUTO_COMMIT
80 except NameError:
81 pass
83 try:
84 self._db = DB(self._env)
85 self._db.open(filename, DB_BTREE, flags, mode)
86 except bsddb._db.DBNoSuchFileError:
87 if create:
88 self._db = DB(self._env)
89 self._db.open(filename, DB_BTREE, flags | DB_CREATE, mode)
90 self.set_db_version(self.CURRENT_VERSION)
91 else:
92 raise
93 try:
94 self.convert_old_versions()
95 except Exception, ex:
96 try:
97 filename = tempfile.mktemp(prefix="straw-")
98 fh = open(filename, "w")
99 traceback.print_exc(None, fh)
100 raise ConvertException(self.get_db_version(),
101 MyDB.CURRENT_VERSION, "%s" % filename)
102 finally:
103 fh.close()
105 def close(self):
106 if self._db is not None:
107 self._db.close()
108 self._db = None
109 if self._env is not None:
110 self._env.close()
111 self._env = None
113 def checkpoint(self):
114 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
115 cpflags = 0
116 self._env.txn_checkpoint(cpflags | DB_FORCE )
117 deletees = self._env.log_archive(DB_ARCH_ABS)
118 for d in deletees:
119 os.remove(d)
121 def begin_transaction(self):
122 return self._env.txn_begin()
124 def get_item_ids(self, iid, txn):
125 key = "fids:%d" % iid
126 dids = self._db.get(key, txn=txn)
127 ids = []
128 if dids:
129 ids = pickle.loads(dids)
130 return ids
132 def save_feed_item_ids(self, feed, ids, txn=None):
133 rowid = "fids:%d" % feed.id
134 commit = 0
135 if not txn:
136 txn = self.begin_transaction()
137 commit = 1
138 try:
139 try:
140 self._db.delete(rowid, txn=txn)
141 except DBNotFoundError:
142 pass
143 self._db.put(rowid, pickle.dumps(ids), txn=txn)
144 except Exception, ex:
145 if commit:
146 txn.abort()
147 logtb(str(ex))
148 else:
149 if commit:
150 txn.commit()
152 def get_item(self, feed_id, item_id, txn=None):
153 item = self._db.get("%d:%d" % (feed_id, item_id), txn=txn)
154 return unstringify_item(item)
156 def add_items(self, feed, items):
157 txn = self.begin_transaction()
158 try:
159 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
160 for item in items:
161 self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn)
162 # TODO: it might be a good idea to check here that we don't add
163 # duplicate items. It doesn't happen normally, but there can be
164 # bugs that trigger that. Throwing an exception would be the
165 # the right thing: it wouldn't hide the breakage.
166 feed_item_ids.append(item.id)
167 self.save_feed_item_ids(feed, feed_item_ids, txn)
168 except Exception, ex:
169 txn.abort()
170 logtb(str(ex))
171 else:
172 txn.commit()
174 def delete_items(self, feed, items):
175 """ Deletes a list of items.
177 Useful for cutting old items based on number of items stored.
179 txn = self.begin_transaction()
180 try:
181 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
182 # because of bugs, we sometime get here duplicate ids. instead of dying,
183 # warn the user but continue
184 item_ids = []
185 for item in items:
186 item.clean_up()
187 if item.id in item_ids:
188 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item.title, item.id))
189 # filter out any duplicates
190 feed_item_ids = filter(lambda x: x != item.id, feed_item_ids)
191 continue
192 item_ids.append(item.id)
193 #log("deleting item %d:%d" % (feed.id, item.id))
194 if item.id in feed_item_ids:
195 feed_item_ids.remove(item.id)
196 self._db.delete("%d:%d" % (feed.id, item.id), txn=txn)
197 self.save_feed_item_ids(feed, feed_item_ids, txn)
198 except Exception, ex:
199 txn.abort()
200 log_exc("error while deleting items")
201 else:
202 txn.commit()
204 def modify_items(self, items):
205 txn = self.begin_transaction()
206 try:
207 for item in items:
208 self._db.put("%d:%d" % (item.feed.id, item.id),
209 stringify_item(item), txn=txn)
210 except Exception, ex:
211 txn.abort()
212 logtb(str(ex))
213 else:
214 txn.commit()
216 def get_feed_items(self, feed):
217 txn = self.begin_transaction()
218 items = []
219 try:
220 ids = self.get_item_ids(feed.id, txn=txn)
221 for id in ids:
222 item = self.get_item(feed.id, id, txn=txn)
223 if item is not None:
224 items.append(item)
225 except Exception, ex:
226 txn.abort()
227 raise
228 #XXX log(str(ex))
229 else:
230 txn.commit()
231 return items
233 def get_number_of_unread(self, fid, cutoff):
234 # Used by config conversion
235 # NOTE: this is the number of unread items in 'number of items stored'
236 # preference. Since straw stores the most recent items down the list,
237 # we only count the unread items from the most recent N items,
238 # where N = cutoff.
239 txn = self.begin_transaction()
240 num_unread = 0
241 try:
242 ids = self.get_item_ids(fid, txn=txn)
243 for id in ids[len(ids)-cutoff:]:
244 item = self.get_item(fid, id, txn=txn)
245 if item is not None and item.seen == 0:
246 num_unread += 1
247 else: continue
248 except Exception, ex:
249 txn.abort()
250 logtb(str(ex))
251 else:
252 txn.commit()
253 return num_unread
255 def get_image_urls(self, txn=None):
256 dkeys = self._db.get("images", txn=txn)
257 keys = []
258 if dkeys is not None:
259 keys = pickle.loads(dkeys)
260 return keys
262 def save_image_urls(self, urls, txn=None):
263 self._db.put("images", pickle.dumps(urls), txn=txn)
265 def get_image_counts(self, txn=None):
266 images = self.get_image_urls(txn)
267 counts = []
268 for image in images:
269 key = ("imagecount:" + image).encode('utf-8')
270 value = self._db.get(str(key))
271 try:
272 counts.append((image, int(value)))
273 except:
274 log("exception for ", key, ", type of value ", value, ": ", type(value))
275 return counts
277 def update_image_count(self, url, count):
278 #logparam(locals(), "url", "count")
279 key = ("imagecount:" + url).encode('utf-8')
280 txn = self.begin_transaction()
281 try:
282 if count < 1:
283 self._db.delete(key, txn=txn)
284 else:
285 self._db.put(key, str(count), txn=txn)
286 except:
287 txn.abort()
288 raise
289 else:
290 txn.commit()
292 def update_image(self, url, image):
293 key = "image:%s" % str(url)
294 txn = self.begin_transaction()
295 try:
296 image_urls = self.get_image_urls(txn)
297 if image is not None:
298 self._db.put(key.encode('utf-8'), image, txn=txn)
299 if url not in image_urls:
300 image_urls.append(url)
301 self.save_image_urls(image_urls, txn)
302 else:
303 if url in image_urls:
304 try:
305 self._db.delete(key, txn=txn)
306 except DBNotFoundError:
307 log("Key not found", key)
308 image_urls.remove(url)
309 self.save_image_urls(image_urls, txn=txn)
310 except:
311 txn.abort()
312 raise
313 else:
314 txn.commit()
316 def get_image_data(self, url, txn=None):
317 return self._db.get(
318 "image:%s" % url.encode('utf-8'), default = None, txn=txn)
320 def _image_print(self, key, data):
321 if key[:6] == "image:":
322 print key
324 def _data_print(self, key, data):
325 data = pickle.loads(data)
326 pprint ({key: data})
328 def _db_print(self, helper):
329 """Print the database to stdout for debugging"""
330 print "******** Printing raw database for debugging ********"
331 print "database version: %s" % self.get_db_version()
332 cur = self._db.cursor()
333 try:
334 key, data = cur.first()
335 while 1 :
336 helper(key, data)
337 next = cur.next()
338 if next:
339 key, data = next
340 finally:
341 cur.close()
343 def get_db_version(self, txn=None):
344 version = self._db.get("straw_db_version", default = "1", txn=txn)
345 return int(version)
347 def set_db_version(self, version, txn=None):
348 try:
349 if txn is None:
350 txn = self.begin_transaction()
351 self._db.put("straw_db_version", str(version), txn=txn)
352 except:
353 txn.abort()
354 raise
355 else:
356 txn.commit()
358 def convert_old_versions(self):
359 version = self.get_db_version()
360 while version < self.CURRENT_VERSION:
361 next = version + 1
362 mname = "convert_%d_%d" % (version, next)
363 try:
364 method = getattr(self, mname)
365 except AttributeError:
366 raise ConvertException(version, next, "No conversion function specified")
367 method()
368 self.set_db_version(next)
369 version = next
371 def convert_1_2(self):
372 def is_item(key):
373 parts = key.split(':')
374 if len(parts) != 2:
375 return False
376 return parts[0].isdigit() and parts[1].isdigit()
378 def round_second(ttuple):
379 l = list(ttuple)
380 l[5] = int(round(l[5]))
381 return tuple(l)
383 try:
384 import mx.DateTime as mxd
385 except ImportError:
386 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
387 txn = self.begin_transaction()
388 try:
389 cur = self._db.cursor(txn=txn)
390 try:
391 next = cur.first()
392 key = None
393 if next:
394 key, data = cur.first()
395 while key is not None:
396 if is_item(key):
397 dict = pickle.loads(data)
398 if isinstance(dict['pub_date'], mxd.DateTimeType):
399 p = dict['pub_date']
400 t = time.gmtime(time.mktime(round_second(p.tuple())))
401 dict['pub_date'] = t
402 data = pickle.dumps(dict)
403 cur.put(key, data, DB_CURRENT)
404 next = cur.next()
405 if next:
406 key, data = next
407 else:
408 break
409 finally:
410 cur.close()
411 except Exception, ex:
412 txn.abort()
413 raise
414 else:
415 txn.commit()
417 def convert_2_3(self):
418 def is_item(key):
419 parts = key.split(':')
420 if len(parts) != 2:
421 return False
422 return parts[0].isdigit() and parts[1].isdigit()
424 imagelistcursor = None
425 images = {}
426 txn = self.begin_transaction()
427 try:
428 cur = self._db.cursor(txn=txn)
429 try:
430 next = cur.first()
431 key = None
432 if next:
433 key, data = cur.first()
434 while key is not None:
435 if is_item(key):
436 dic = pickle.loads(data)
437 for image in dic['images']:
438 images[image] = images.get(image, 0) + 1
439 elif key == "images":
440 imagelistcursor = cur.dup(DB_POSITION)
441 next = cur.next()
442 if next:
443 key, data = next
444 else:
445 break
446 for image, count in images.items():
447 key = ("imagecount:" + image).encode('utf-8')
448 cur.put(key, str(count), DB_KEYFIRST)
449 imagelistcursor.put("images", pickle.dumps(images.keys()), DB_CURRENT)
450 finally:
451 cur.close()
452 if imagelistcursor != None:
453 imagelistcursor.close()
454 except Exception, ex:
455 txn.abort()
456 raise
457 else:
458 txn.commit()
460 class ModifyItemAction:
461 def __init__(self, item):
462 self._item = item
464 def doit(self, db):
465 db.modify_items([self._item])
467 class ModifyItemsAction:
468 def __init__(self, items):
469 self._items = items
471 def doit(self, db):
472 db.modify_items(self._items)
474 class ItemsAddedAction:
475 def __init__(self, feed, items):
476 self._feed = feed
477 self._items = items
479 def doit(self, db):
480 db.add_items(self._feed, self._items)
482 class DeleteItemsAction:
483 def __init__(self, feed, items):
484 self._feed = feed
485 self._items = items
487 def doit(self, db):
488 db.delete_items(self._feed, self._items)
490 class ImageUpdateAction:
491 def __init__(self, url, image):
492 self._url = url
493 self._image = image
495 def doit(self, db):
496 db.update_image(self._url, self._image)
498 class ImageCountChangedAction:
499 def __init__(self, url, count):
500 self._url = url
501 self._count = count
503 def doit(self, db):
504 db.update_image_count(self._url, self._count)
506 class ItemStore:
507 def __init__(self, dbhome):
508 feedlist = feeds.get_instance()
509 self._db = MyDB(DATABASE_FILE_NAME, dbhome, create = 1)
510 self.connect_signals()
511 feedlist.connect('updated', self._feed_created_cb)
512 feedlist.connect('deleted', self._feed_deleted_cb)
513 ImageCache.cache.signal_connect(Event.ImageUpdatedSignal,
514 self.image_updated)
515 self._stop = False
516 self._action_queue = []
518 def _feed_created_cb(self, flist, feed, *args):
519 self._connect_feed_signals(feed)
521 def _feed_deleted_cb(self, flist, feed):
522 self._disconnect_feed_signals(feed)
524 def connect_signals(self):
525 flist = feeds.get_instance().flatten_list()
526 for f in flist:
527 self._connect_feed_signals(f)
529 def _connect_feed_signals(self, feed):
530 feed.connect('items-updated', self.items_added)
531 feed.connect('items-read', self.all_items_read)
532 feed.connect('items-deleted', self.items_deleted)
533 # XXX
534 #feed.signal_connect(Event.ItemReadSignal, self.item_modified)
535 #feed.signal_connect(Event.ItemStickySignal, self.item_modified)
537 def _disconnect_feed_signals(self, feed):
538 #feed.signal_disconnect(Event.NewItemsSignal, self.items_added)
539 #feed.signal_disconnect(Event.ItemReadSignal, self.item_modified)
540 #feed.signal_disconnect(Event.ItemStickySignal, self.item_modified)
541 #feed.signal_disconnect(Event.AllItemsReadSignal, self.all_items_read)
542 #feed.signal_disconnect(Event.ItemDeletedSignal, self.item_deleted)
543 pass
545 def modify_item(self, item):
546 self._action_queue.append(ModifyItemAction(item))
547 return
549 def image_updated(self, signal):
550 self._action_queue.append(
551 ImageUpdateAction(signal.url, signal.data))
553 def read_image(self, url):
554 return self._db.get_image_data(url)
556 def items_deleted(self, feed, items):
557 self._action_queue.append(DeleteItemsAction(feed, items))
559 def item_modified(self, signal):
560 self.modify_item(signal.item)
562 def all_items_read(self, feed, items):
563 self._action_queue.append(ModifyItemsAction(items))
565 def items_added(self, feed, items):
566 self._action_queue.append(ItemsAddedAction(feed, items))
568 def read_feed_items(self, feed):
569 return self._db.get_feed_items(feed)
571 def get_number_of_unread(self, feed_id, cutoff):
572 return self._db.get_number_of_unread(feed_id, cutoff)
574 def get_image_counts(self):
575 return self._db.get_image_counts()
577 def set_image_count(self, image, count):
578 self._action_queue.append(
579 ImageCountChangedAction(image, count))
581 def start(self):
582 mlmgr = MainloopManager.get_instance()
583 mlmgr.set_repeating_timer(5000, self._run)
585 def stop(self):
586 mlmgr = MainloopManager.get_instance()
587 mlmgr.end_repeating_timer(self._run)
588 self._db.checkpoint()
589 self._db.close()
590 self._stop = True
592 def _run(self):
593 self._db.checkpoint()
594 freq = 5
595 timer = freq
596 cpfreq = 60
597 cptimer = cpfreq
598 prevtime = time.time()
599 if not self._stop:
600 tmptime = time.time()
601 timer += tmptime - prevtime
602 cptimer += tmptime - prevtime
603 prevtime = tmptime
604 if timer > freq:
605 try:
606 while len(self._action_queue):
607 action = self._action_queue.pop(0)
608 if action is None:
609 break
610 action.doit(self._db)
611 except IndexError, e:
612 pass
613 timer = 0
614 if cptimer > cpfreq:
615 self._db.checkpoint()
616 cptimer = 0
618 itemstore_instance = None
619 def get_instance():
620 global itemstore_instance
621 if itemstore_instance is None:
622 import Config
623 itemstore_instance = ItemStore(Config.straw_home())
624 return itemstore_instance
626 def stringify_item(item):
627 itemdict = {
628 'title': item.title,
629 'link': item.link,
630 'description': item.description,
631 'guid': item.guid,
632 'guidislink': item.guidislink,
633 'pub_date': item.pub_date,
634 'source': item.source,
635 'images': item.image_keys(),
636 'seen': item.seen,
637 'id': item.id,
638 'fm_license': item.fm_license,
639 'fm_changes': item.fm_changes,
640 'creator': item.creator,
641 'contributors': item.contributors,
642 'license_urls': item.license_urls,
643 'publication_name': item.publication_name,
644 'publication_volume': item.publication_volume,
645 'publication_number': item.publication_number,
646 'publication_section': item.publication_section,
647 'publication_starting_page': item.publication_starting_page,
648 'sticky': item._sticky,
649 'enclosures': item.enclosures}
650 return pickle.dumps(itemdict)
652 def unstringify_item(itemstring):
653 if not itemstring:
654 return None
656 idict = _unpickle(itemstring)
657 if not idict:
658 return None
660 item = SummaryItem.SummaryItem()
661 item.title = idict['title']
662 item.link = idict['link']
663 item.description = idict['description']
664 item.guid = idict['guid']
665 item.pub_date = idict['pub_date']
666 item.source = idict['source']
667 for i in idict['images']:
668 item.restore_image(i)
669 item.seen = idict['seen']
670 item.id = idict['id']
671 item.guidislink = idict.get('guidislink', True)
672 item.fm_license = idict.get('fm_license', None)
673 item.fm_changes = idict.get('fm_changes', None)
674 item.creator = idict.get('creator', None)
675 item.contributors = idict.get('contributors', None)
676 item.license_urls = idict.get('license_urls', None)
677 item._sticky = idict.get('sticky', 0)
678 item.enclosures = idict.get('enclosures', None)
679 item.publication_name = idict.get('publication_name', None)
680 item.publication_volume = idict.get('publication_volume', None)
681 item.publication_number = idict.get('publication_number', None)
682 item.publication_section = idict.get('publication_section', None)
683 item.publication_starting_page = idict.get('publication_starting_page', None)
684 return item
686 def _unpickle(istring):
687 itemdict = None
688 try:
689 itemdict = pickle.loads(istring)
690 except ValueError, ve:
691 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring))
692 except Exception, ex:
693 logtb(str(ex))
694 return itemdict
696 if __name__ == '__main__':
697 from pprint import pprint
698 db = MyDB("itemstore.db", "%s/.straw" % os.getenv('HOME'), create = 1)
699 db._db_print(db._data_print)