iteration 1 - Use gobject for events in feed, summaryitem and feedlist
[straw.git] / src / lib / ItemStore.py
blob1730f3eb549a4ee1363b5b5e31e4db9441389094
1 """ ItemStore.py
3 Data store abstraction module.
4 """
5 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
6 __license__ = """
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
10 version.
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle as pickle
21 import os, sys
22 import time
23 from error import *
24 import tempfile
25 import traceback
27 try:
28 from bsddb.db import *
29 import bsddb
30 except ImportError:
31 from bsddb3.db import *
32 import bsddb3 as bsddb
34 import Event
35 import SummaryItem
36 import feeds
37 import ImageCache
38 from MainloopManager import MainloopManager
40 DATABASE_FILE_NAME = "itemstore.db"
42 class ConvertException(Exception):
43 def __init__(self, version1, version2, reason):
44 self.version1 = version1
45 self.version2 = version2
46 self.reason = reason
48 class MyDB:
49 CURRENT_VERSION = 3
51 def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600,
52 recover=0, dbflags=0):
53 self._db = None
54 self._env = None
55 recoverenv = DB_CREATE | DB_RECOVER
56 # DB_INIT_TXN automatically enables logging
57 flagsforenv = DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
59 self._env = DBEnv()
60 self._env.set_data_dir(dbhome)
61 self._env.set_lk_detect(DB_LOCK_DEFAULT) # enable auto deadlock avoidance
62 self._env.set_lg_max(2**20)
63 self._env.set_lk_max_locks (10000)
64 self._env.set_lk_max_objects (10000)
66 try:
67 self._env.open(dbhome, recoverenv | flagsforenv, mode)
68 except bsddb._db.DBRunRecoveryError, err:
69 self._env.remove(dbhome)
70 self._env.close()
71 log("%s" % err[1])
72 sys.exit("Recovery Error: See README for details on how to recover data. ")
74 flags = 0
75 if truncate:
76 flags |= DB_TRUNCATE
78 try:
79 flags |= DB_AUTO_COMMIT
80 except NameError:
81 pass
83 try:
84 self._db = DB(self._env)
85 self._db.open(filename, DB_BTREE, flags, mode)
86 except bsddb._db.DBNoSuchFileError:
87 if create:
88 self._db = DB(self._env)
89 self._db.open(filename, DB_BTREE, flags | DB_CREATE, mode)
90 self.set_db_version(self.CURRENT_VERSION)
91 else:
92 raise
93 try:
94 self.convert_old_versions()
95 except Exception, ex:
96 try:
97 filename = tempfile.mktemp(prefix="straw-")
98 fh = open(filename, "w")
99 traceback.print_exc(None, fh)
100 raise ConvertException(self.get_db_version(),
101 MyDB.CURRENT_VERSION, "%s" % filename)
102 finally:
103 fh.close()
105 def close(self):
106 if self._db is not None:
107 self._db.close()
108 self._db = None
109 if self._env is not None:
110 self._env.close()
111 self._env = None
113 def checkpoint(self):
114 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
115 cpflags = 0
116 self._env.txn_checkpoint(cpflags | DB_FORCE )
117 deletees = self._env.log_archive(DB_ARCH_ABS)
118 for d in deletees:
119 os.remove(d)
121 def begin_transaction(self):
122 return self._env.txn_begin()
124 def get_item_ids(self, iid, txn):
125 key = "fids:%d" % iid
126 dids = self._db.get(key, txn=txn)
127 ids = []
128 if dids:
129 ids = pickle.loads(dids)
130 return ids
132 def save_feed_item_ids(self, feed, ids, txn=None):
133 rowid = "fids:%d" % feed.id
134 commit = 0
135 if not txn:
136 txn = self.begin_transaction()
137 commit = 1
138 try:
139 try:
140 self._db.delete(rowid, txn=txn)
141 except DBNotFoundError:
142 pass
143 self._db.put(rowid, pickle.dumps(ids), txn=txn)
144 except Exception, ex:
145 if commit:
146 txn.abort()
147 logtb(str(ex))
148 else:
149 if commit:
150 txn.commit()
152 def get_item(self, feed_id, item_id, txn=None):
153 item = self._db.get("%d:%d" % (feed_id, item_id), txn=txn)
154 return unstringify_item(item)
156 def add_items(self, feed, items):
157 txn = self.begin_transaction()
158 try:
159 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
160 for item in items:
161 self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn)
162 # TODO: it might be a good idea to check here that we don't add
163 # duplicate items. It doesn't happen normally, but there can be
164 # bugs that trigger that. Throwing an exception would be the
165 # the right thing: it wouldn't hide the breakage.
166 feed_item_ids.append(item.id)
167 self.save_feed_item_ids(feed, feed_item_ids, txn)
168 except Exception, ex:
169 txn.abort()
170 logtb(str(ex))
171 else:
172 txn.commit()
174 def delete_items(self, feed, items):
175 """ Deletes a list of items.
177 Useful for cutting old items based on number of items stored.
179 txn = self.begin_transaction()
180 try:
181 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
182 # because of bugs, we sometime get here duplicate ids. instead of dying,
183 # warn the user but continue
184 item_ids = []
185 for item in items:
186 item.clean_up()
187 if item.id in item_ids:
188 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item.title, item.id))
189 # filter out any duplicates
190 feed_item_ids = filter(lambda x: x != item.id, feed_item_ids)
191 continue
192 item_ids.append(item.id)
193 #log("deleting item %d:%d" % (feed.id, item.id))
194 if item.id in feed_item_ids:
195 feed_item_ids.remove(item.id)
196 self._db.delete("%d:%d" % (feed.id, item.id), txn=txn)
197 self.save_feed_item_ids(feed, feed_item_ids, txn)
198 except Exception, ex:
199 txn.abort()
200 log_exc("error while deleting items")
201 else:
202 txn.commit()
204 def modify_items(self, items):
205 txn = self.begin_transaction()
206 try:
207 for item in items:
208 self._db.put("%d:%d" % (item.feed.id, item.id),
209 stringify_item(item), txn=txn)
210 except Exception, ex:
211 txn.abort()
212 logtb(str(ex))
213 else:
214 txn.commit()
216 def get_feed_items(self, feed):
217 txn = self.begin_transaction()
218 items = []
219 try:
220 ids = self.get_item_ids(feed.id, txn=txn)
221 for id in ids:
222 item = self.get_item(feed.id, id, txn=txn)
223 if item is not None:
224 items.append(item)
225 except Exception, ex:
226 txn.abort()
227 log(str(ex))
228 else:
229 txn.commit()
230 return items
232 def get_number_of_unread(self, fid, cutoff):
233 # Used by config conversion
234 # NOTE: this is the number of unread items in 'number of items stored'
235 # preference. Since straw stores the most recent items down the list,
236 # we only count the unread items from the most recent N items,
237 # where N = cutoff.
238 txn = self.begin_transaction()
239 num_unread = 0
240 try:
241 ids = self.get_item_ids(fid, txn=txn)
242 for id in ids[len(ids)-cutoff:]:
243 item = self.get_item(fid, id, txn=txn)
244 if item is not None and item.seen == 0:
245 num_unread += 1
246 else: continue
247 except Exception, ex:
248 txn.abort()
249 logtb(str(ex))
250 else:
251 txn.commit()
252 return num_unread
254 def get_image_urls(self, txn=None):
255 dkeys = self._db.get("images", txn=txn)
256 keys = []
257 if dkeys is not None:
258 keys = pickle.loads(dkeys)
259 return keys
261 def save_image_urls(self, urls, txn=None):
262 self._db.put("images", pickle.dumps(urls), txn=txn)
264 def get_image_counts(self, txn=None):
265 images = self.get_image_urls(txn)
266 counts = []
267 for image in images:
268 key = ("imagecount:" + image).encode('utf-8')
269 value = self._db.get(str(key))
270 try:
271 counts.append((image, int(value)))
272 except:
273 log("exception for ", key, ", type of value ", value, ": ", type(value))
274 return counts
276 def update_image_count(self, url, count):
277 #logparam(locals(), "url", "count")
278 key = ("imagecount:" + url).encode('utf-8')
279 txn = self.begin_transaction()
280 try:
281 if count < 1:
282 self._db.delete(key, txn=txn)
283 else:
284 self._db.put(key, str(count), txn=txn)
285 except:
286 txn.abort()
287 raise
288 else:
289 txn.commit()
291 def update_image(self, url, image):
292 key = "image:%s" % str(url)
293 txn = self.begin_transaction()
294 try:
295 image_urls = self.get_image_urls(txn)
296 if image is not None:
297 self._db.put(key.encode('utf-8'), image, txn=txn)
298 if url not in image_urls:
299 image_urls.append(url)
300 self.save_image_urls(image_urls, txn)
301 else:
302 if url in image_urls:
303 try:
304 self._db.delete(key, txn=txn)
305 except DBNotFoundError:
306 log("Key not found", key)
307 image_urls.remove(url)
308 self.save_image_urls(image_urls, txn=txn)
309 except:
310 txn.abort()
311 raise
312 else:
313 txn.commit()
315 def get_image_data(self, url, txn=None):
316 return self._db.get(
317 "image:%s" % url.encode('utf-8'), default = None, txn=txn)
319 def _image_print(self, key, data):
320 if key[:6] == "image:":
321 print key
323 def _data_print(self, key, data):
324 data = pickle.loads(data)
325 pprint ({key: data})
327 def _db_print(self, helper):
328 """Print the database to stdout for debugging"""
329 print "******** Printing raw database for debugging ********"
330 print "database version: %s" % self.get_db_version()
331 cur = self._db.cursor()
332 try:
333 key, data = cur.first()
334 while 1 :
335 helper(key, data)
336 next = cur.next()
337 if next:
338 key, data = next
339 finally:
340 cur.close()
342 def get_db_version(self, txn=None):
343 version = self._db.get("straw_db_version", default = "1", txn=txn)
344 return int(version)
346 def set_db_version(self, version, txn=None):
347 try:
348 if txn is None:
349 txn = self.begin_transaction()
350 self._db.put("straw_db_version", str(version), txn=txn)
351 except:
352 txn.abort()
353 raise
354 else:
355 txn.commit()
357 def convert_old_versions(self):
358 version = self.get_db_version()
359 while version < self.CURRENT_VERSION:
360 next = version + 1
361 mname = "convert_%d_%d" % (version, next)
362 try:
363 method = getattr(self, mname)
364 except AttributeError:
365 raise ConvertException(version, next, "No conversion function specified")
366 method()
367 self.set_db_version(next)
368 version = next
370 def convert_1_2(self):
371 def is_item(key):
372 parts = key.split(':')
373 if len(parts) != 2:
374 return False
375 return parts[0].isdigit() and parts[1].isdigit()
377 def round_second(ttuple):
378 l = list(ttuple)
379 l[5] = int(round(l[5]))
380 return tuple(l)
382 try:
383 import mx.DateTime as mxd
384 except ImportError:
385 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
386 txn = self.begin_transaction()
387 try:
388 cur = self._db.cursor(txn=txn)
389 try:
390 next = cur.first()
391 key = None
392 if next:
393 key, data = cur.first()
394 while key is not None:
395 if is_item(key):
396 dict = pickle.loads(data)
397 if isinstance(dict['pub_date'], mxd.DateTimeType):
398 p = dict['pub_date']
399 t = time.gmtime(time.mktime(round_second(p.tuple())))
400 dict['pub_date'] = t
401 data = pickle.dumps(dict)
402 cur.put(key, data, DB_CURRENT)
403 next = cur.next()
404 if next:
405 key, data = next
406 else:
407 break
408 finally:
409 cur.close()
410 except Exception, ex:
411 txn.abort()
412 raise
413 else:
414 txn.commit()
416 def convert_2_3(self):
417 def is_item(key):
418 parts = key.split(':')
419 if len(parts) != 2:
420 return False
421 return parts[0].isdigit() and parts[1].isdigit()
423 imagelistcursor = None
424 images = {}
425 txn = self.begin_transaction()
426 try:
427 cur = self._db.cursor(txn=txn)
428 try:
429 next = cur.first()
430 key = None
431 if next:
432 key, data = cur.first()
433 while key is not None:
434 if is_item(key):
435 dic = pickle.loads(data)
436 for image in dic['images']:
437 images[image] = images.get(image, 0) + 1
438 elif key == "images":
439 imagelistcursor = cur.dup(DB_POSITION)
440 next = cur.next()
441 if next:
442 key, data = next
443 else:
444 break
445 for image, count in images.items():
446 key = ("imagecount:" + image).encode('utf-8')
447 cur.put(key, str(count), DB_KEYFIRST)
448 imagelistcursor.put("images", pickle.dumps(images.keys()), DB_CURRENT)
449 finally:
450 cur.close()
451 if imagelistcursor != None:
452 imagelistcursor.close()
453 except Exception, ex:
454 txn.abort()
455 raise
456 else:
457 txn.commit()
459 class ModifyItemAction:
460 def __init__(self, item):
461 self._item = item
463 def doit(self, db):
464 db.modify_items([self._item])
466 class ModifyItemsAction:
467 def __init__(self, items):
468 self._items = items
470 def doit(self, db):
471 db.modify_items(self._items)
473 class ItemsAddedAction:
474 def __init__(self, feed, items):
475 self._feed = feed
476 self._items = items
478 def doit(self, db):
479 db.add_items(self._feed, self._items)
481 class DeleteItemsAction:
482 def __init__(self, feed, items):
483 self._feed = feed
484 self._items = items
486 def doit(self, db):
487 db.delete_items(self._feed, self._items)
489 class ImageUpdateAction:
490 def __init__(self, url, image):
491 self._url = url
492 self._image = image
494 def doit(self, db):
495 db.update_image(self._url, self._image)
497 class ImageCountChangedAction:
498 def __init__(self, url, count):
499 self._url = url
500 self._count = count
502 def doit(self, db):
503 db.update_image_count(self._url, self._count)
505 class ItemStore:
506 def __init__(self, dbhome):
507 feedlist = feeds.get_instance()
508 self._db = MyDB(DATABASE_FILE_NAME, dbhome, create = 1)
509 self.connect_signals()
510 feedlist.connect('updated', self._feed_created_cb)
511 feedlist.connect('deleted', self._feed_deleted_cb)
512 ImageCache.cache.signal_connect(Event.ImageUpdatedSignal,
513 self.image_updated)
514 self._stop = False
515 self._action_queue = []
517 def _feed_created_cb(self, flist, feed, *args):
518 self._connect_feed_signals(feed)
520 def _feed_deleted_cb(self, flist, feed):
521 self._disconnect_feed_signals(feed)
523 def connect_signals(self):
524 flist = feeds.get_instance().flatten_list()
525 for f in flist:
526 self._connect_feed_signals(f)
528 def _connect_feed_signals(self, feed):
529 feed.connect('items-updated', self.items_added)
530 feed.connect('items-read', self.all_items_read)
531 feed.connect('items-deleted', self.items_deleted)
532 # XXX
533 #feed.signal_connect(Event.ItemReadSignal, self.item_modified)
534 #feed.signal_connect(Event.ItemStickySignal, self.item_modified)
536 def _disconnect_feed_signals(self, feed):
537 #feed.signal_disconnect(Event.NewItemsSignal, self.items_added)
538 #feed.signal_disconnect(Event.ItemReadSignal, self.item_modified)
539 #feed.signal_disconnect(Event.ItemStickySignal, self.item_modified)
540 #feed.signal_disconnect(Event.AllItemsReadSignal, self.all_items_read)
541 #feed.signal_disconnect(Event.ItemDeletedSignal, self.item_deleted)
542 pass
544 def modify_item(self, item):
545 self._action_queue.append(ModifyItemAction(item))
546 return
548 def image_updated(self, signal):
549 self._action_queue.append(
550 ImageUpdateAction(signal.url, signal.data))
552 def read_image(self, url):
553 return self._db.get_image_data(url)
555 def items_deleted(self, feed, items):
556 self._action_queue.append(DeleteItemsAction(feed, items))
558 def item_modified(self, signal):
559 self.modify_item(signal.item)
561 def all_items_read(self, feed, items):
562 self._action_queue.append(ModifyItemsAction(items))
564 def items_added(self, feed, items):
565 self._action_queue.append(ItemsAddedAction(feed, items))
567 def read_feed_items(self, feed):
568 return self._db.get_feed_items(feed)
570 def get_number_of_unread(self, feed_id, cutoff):
571 return self._db.get_number_of_unread(feed_id, cutoff)
573 def get_image_counts(self):
574 return self._db.get_image_counts()
576 def set_image_count(self, image, count):
577 self._action_queue.append(
578 ImageCountChangedAction(image, count))
580 def start(self):
581 mlmgr = MainloopManager.get_instance()
582 mlmgr.set_repeating_timer(5000, self._run)
584 def stop(self):
585 mlmgr = MainloopManager.get_instance()
586 mlmgr.end_repeating_timer(self._run)
587 self._db.checkpoint()
588 self._db.close()
589 self._stop = True
591 def _run(self):
592 self._db.checkpoint()
593 freq = 5
594 timer = freq
595 cpfreq = 60
596 cptimer = cpfreq
597 prevtime = time.time()
598 if not self._stop:
599 tmptime = time.time()
600 timer += tmptime - prevtime
601 cptimer += tmptime - prevtime
602 prevtime = tmptime
603 if timer > freq:
604 try:
605 while len(self._action_queue):
606 action = self._action_queue.pop(0)
607 if action is None:
608 break
609 action.doit(self._db)
610 except IndexError, e:
611 pass
612 timer = 0
613 if cptimer > cpfreq:
614 self._db.checkpoint()
615 cptimer = 0
617 itemstore_instance = None
618 def get_instance():
619 global itemstore_instance
620 if itemstore_instance is None:
621 import Config
622 itemstore_instance = ItemStore(Config.straw_home())
623 return itemstore_instance
625 def stringify_item(item):
626 itemdict = {
627 'title': item.title,
628 'link': item.link,
629 'description': item.description,
630 'guid': item.guid,
631 'guidislink': item.guidislink,
632 'pub_date': item.pub_date,
633 'source': item.source,
634 'images': item.image_keys(),
635 'seen': item.seen,
636 'id': item.id,
637 'fm_license': item.fm_license,
638 'fm_changes': item.fm_changes,
639 'creator': item.creator,
640 'contributors': item.contributors,
641 'license_urls': item.license_urls,
642 'publication_name': item.publication_name,
643 'publication_volume': item.publication_volume,
644 'publication_number': item.publication_number,
645 'publication_section': item.publication_section,
646 'publication_starting_page': item.publication_starting_page,
647 'sticky': item._sticky,
648 'enclosures': item.enclosures}
649 return pickle.dumps(itemdict)
651 def unstringify_item(itemstring):
652 if not itemstring:
653 return None
655 idict = _unpickle(itemstring)
656 if not idict:
657 return None
659 item = SummaryItem.SummaryItem()
660 item.title = idict['title']
661 item.link = idict['link']
662 item.description = idict['description']
663 item.guid = idict['guid']
664 item.pub_date = idict['pub_date']
665 item.source = idict['source']
666 for i in idict['images']:
667 item.restore_image(i)
668 item.seen = idict['seen']
669 item.id = idict['id']
670 item.guidislink = idict.get('guidislink', True)
671 item.fm_license = idict.get('fm_license', None)
672 item.fm_changes = idict.get('fm_changes', None)
673 item.creator = idict.get('creator', None)
674 item.contributors = idict.get('contributors', None)
675 item.license_urls = idict.get('license_urls', None)
676 item._sticky = idict.get('sticky', 0)
677 item.enclosures = idict.get('enclosures', None)
678 item.publication_name = idict.get('publication_name', None)
679 item.publication_volume = idict.get('publication_volume', None)
680 item.publication_number = idict.get('publication_number', None)
681 item.publication_section = idict.get('publication_section', None)
682 item.publication_starting_page = idict.get('publication_starting_page', None)
683 return item
685 def _unpickle(istring):
686 itemdict = None
687 try:
688 itemdict = pickle.loads(istring)
689 except ValueError, ve:
690 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring))
691 except Exception, ex:
692 logtb(str(ex))
693 return itemdict
695 if __name__ == '__main__':
696 from pprint import pprint
697 db = MyDB("itemstore.db", "%s/.straw" % os.getenv('HOME'), create = 1)
698 db._db_print(db._data_print)