Fixed item navigation and cleaned up feed and item changes
[straw.git] / src / lib / ItemStore.py
blobd38cb2f8d7177fd871b783a83a45a5ab239aeabb
1 """ ItemStore.py
3 Data store abstraction module.
4 """
5 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
6 __license__ = """
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
10 version.
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle as pickle
21 import os, sys
22 import time
23 from error import *
24 import tempfile
25 import traceback
27 try:
28 from bsddb.db import *
29 import bsddb
30 except ImportError:
31 from bsddb3.db import *
32 import bsddb3 as bsddb
34 import Event
35 import SummaryItem
36 import feeds
37 import ImageCache
38 from MainloopManager import MainloopManager
40 DATABASE_FILE_NAME = "itemstore.db"
42 class ConvertException(Exception):
43 def __init__(self, version1, version2, reason):
44 self.version1 = version1
45 self.version2 = version2
46 self.reason = reason
48 class MyDB:
49 CURRENT_VERSION = 3
51 def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600,
52 recover=0, dbflags=0):
53 self._db = None
54 self._env = None
55 recoverenv = DB_CREATE | DB_RECOVER
56 # DB_INIT_TXN automatically enables logging
57 flagsforenv = DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
59 self._env = DBEnv()
60 self._env.set_data_dir(dbhome)
61 self._env.set_lk_detect(DB_LOCK_DEFAULT) # enable auto deadlock avoidance
62 self._env.set_lg_max(2**20)
63 self._env.set_lk_max_locks (10000)
64 self._env.set_lk_max_objects (10000)
66 try:
67 self._env.open(dbhome, recoverenv | flagsforenv, mode)
68 except bsddb._db.DBRunRecoveryError, err:
69 self._env.remove(dbhome)
70 self._env.close()
71 log("%s" % err[1])
72 sys.exit("Recovery Error: See README for details on how to recover data. ")
74 flags = 0
75 if truncate:
76 flags |= DB_TRUNCATE
78 try:
79 flags |= DB_AUTO_COMMIT
80 except NameError:
81 pass
83 try:
84 self._db = DB(self._env)
85 self._db.open(filename, DB_BTREE, flags, mode)
86 except bsddb._db.DBNoSuchFileError:
87 if create:
88 self._db = DB(self._env)
89 self._db.open(filename, DB_BTREE, flags | DB_CREATE, mode)
90 self.set_db_version(self.CURRENT_VERSION)
91 else:
92 raise
93 try:
94 self.convert_old_versions()
95 except Exception, ex:
96 try:
97 filename = tempfile.mktemp(prefix="straw-")
98 fh = open(filename, "w")
99 traceback.print_exc(None, fh)
100 raise ConvertException(self.get_db_version(),
101 MyDB.CURRENT_VERSION, "%s" % filename)
102 finally:
103 fh.close()
105 def close(self):
106 if self._db is not None:
107 self._db.close()
108 self._db = None
109 if self._env is not None:
110 self._env.close()
111 self._env = None
113 def checkpoint(self):
114 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
115 cpflags = 0
116 self._env.txn_checkpoint(cpflags | DB_FORCE )
117 deletees = self._env.log_archive(DB_ARCH_ABS)
118 for d in deletees:
119 os.remove(d)
121 def begin_transaction(self):
122 return self._env.txn_begin()
124 def get_item_ids(self, iid, txn):
125 key = "fids:%d" % iid
126 dids = self._db.get(key, txn=txn)
127 ids = []
128 if dids:
129 ids = pickle.loads(dids)
130 return ids
132 def save_feed_item_ids(self, feed, ids, txn=None):
133 rowid = "fids:%d" % feed.id
134 commit = 0
135 if not txn:
136 txn = self.begin_transaction()
137 commit = 1
138 try:
139 try:
140 self._db.delete(rowid, txn=txn)
141 except DBNotFoundError:
142 pass
143 self._db.put(rowid, pickle.dumps(ids), txn=txn)
144 except Exception, ex:
145 if commit:
146 txn.abort()
147 logtb(str(ex))
148 else:
149 if commit:
150 txn.commit()
152 def get_item(self, feed_id, item_id, txn=None):
153 item = self._db.get("%d:%d" % (feed_id, item_id), txn=txn)
154 return unstringify_item(item)
156 def add_items(self, feed, items):
157 txn = self.begin_transaction()
158 try:
159 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
160 for item in items:
161 self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn)
162 # TODO: it might be a good idea to check here that we don't add
163 # duplicate items. It doesn't happen normally, but there can be
164 # bugs that trigger that. Throwing an exception would be the
165 # the right thing: it wouldn't hide the breakage.
166 feed_item_ids.append(item.id)
167 self.save_feed_item_ids(feed, feed_item_ids, txn)
168 except Exception, ex:
169 txn.abort()
170 logtb(str(ex))
171 else:
172 txn.commit()
174 def delete_items(self, feed, items):
175 """ Deletes a list of items.
177 Useful for cutting old items based on number of items stored.
179 txn = self.begin_transaction()
180 try:
181 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
182 # because of bugs, we sometime get here duplicate ids. instead of dying,
183 # warn the user but continue
184 item_ids = []
185 for item in items:
186 item.clean_up()
187 if item.id in item_ids:
188 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item.title, item.id))
189 # filter out any duplicates
190 feed_item_ids = filter(lambda x: x != item.id, feed_item_ids)
191 continue
192 item_ids.append(item.id)
193 #log("deleting item %d:%d" % (feed.id, item.id))
194 if item.id in feed_item_ids:
195 feed_item_ids.remove(item.id)
196 self._db.delete("%d:%d" % (feed.id, item.id), txn=txn)
197 self.save_feed_item_ids(feed, feed_item_ids, txn)
198 except Exception, ex:
199 txn.abort()
200 log_exc("error while deleting items")
201 else:
202 txn.commit()
204 def modify_items(self, items):
205 txn = self.begin_transaction()
206 try:
207 for item in items:
208 self._db.put("%d:%d" % (item.feed.id, item.id),
209 stringify_item(item), txn=txn)
210 except Exception, ex:
211 txn.abort()
212 logtb(str(ex))
213 else:
214 txn.commit()
216 def get_feed_items(self, feed):
217 txn = self.begin_transaction()
218 items = []
219 try:
220 ids = self.get_item_ids(feed.id, txn=txn)
221 for id in ids:
222 item = self.get_item(feed.id, id, txn=txn)
223 if item is not None:
224 items.append(item)
225 except Exception, ex:
226 txn.abort()
227 raise
228 #XXX log(str(ex))
229 else:
230 txn.commit()
231 return items
233 def get_number_of_unread(self, fid, cutoff):
234 # Used by config conversion
235 # NOTE: this is the number of unread items in 'number of items stored'
236 # preference. Since straw stores the most recent items down the list,
237 # we only count the unread items from the most recent N items,
238 # where N = cutoff.
239 txn = self.begin_transaction()
240 num_unread = 0
241 try:
242 ids = self.get_item_ids(fid, txn=txn)
243 for id in ids[len(ids)-cutoff:]:
244 item = self.get_item(fid, id, txn=txn)
245 if item is not None and item.seen == 0:
246 num_unread += 1
247 else: continue
248 except Exception, ex:
249 txn.abort()
250 logtb(str(ex))
251 else:
252 txn.commit()
253 return num_unread
255 def get_image_urls(self, txn=None):
256 dkeys = self._db.get("images", txn=txn)
257 keys = []
258 if dkeys is not None:
259 keys = pickle.loads(dkeys)
260 return keys
262 def save_image_urls(self, urls, txn=None):
263 self._db.put("images", pickle.dumps(urls), txn=txn)
265 def get_image_counts(self, txn=None):
266 images = self.get_image_urls(txn)
267 counts = []
268 for image in images:
269 key = ("imagecount:" + image).encode('utf-8')
270 value = self._db.get(str(key))
271 try:
272 counts.append((image, int(value)))
273 except:
274 log("exception for ", key, ", type of value ", value, ": ", type(value))
275 return counts
277 def update_image_count(self, url, count):
278 #logparam(locals(), "url", "count")
279 key = ("imagecount:" + url).encode('utf-8')
280 txn = self.begin_transaction()
281 try:
282 if count < 1:
283 self._db.delete(key, txn=txn)
284 else:
285 self._db.put(key, str(count), txn=txn)
286 except:
287 txn.abort()
288 raise
289 else:
290 txn.commit()
292 def update_image(self, url, image):
293 key = "image:%s" % str(url)
294 txn = self.begin_transaction()
295 try:
296 image_urls = self.get_image_urls(txn)
297 if image:
298 self._db.put(key.encode('utf-8'), image, txn=txn)
299 if url not in image_urls:
300 image_urls.append(url)
301 self.save_image_urls(image_urls, txn)
302 else:
303 if url in image_urls:
304 try:
305 self._db.delete(key, txn=txn)
306 except DBNotFoundError:
307 log("Key not found", key)
308 image_urls.remove(url)
309 self.save_image_urls(image_urls, txn=txn)
310 except:
311 txn.abort()
312 raise
313 else:
314 txn.commit()
316 def get_image_data(self, url, txn=None):
317 return self._db.get(
318 "image:%s" % url.encode('utf-8'), default = None, txn=txn)
320 def _image_print(self, key, data):
321 if key[:6] == "image:":
322 print key
324 def _data_print(self, key, data):
325 data = pickle.loads(data)
326 pprint ({key: data})
328 def _db_print(self, helper):
329 """Print the database to stdout for debugging"""
330 print "******** Printing raw database for debugging ********"
331 print "database version: %s" % self.get_db_version()
332 cur = self._db.cursor()
333 try:
334 key, data = cur.first()
335 while 1 :
336 helper(key, data)
337 next = cur.next()
338 if next:
339 key, data = next
340 finally:
341 cur.close()
343 def get_db_version(self, txn=None):
344 version = self._db.get("straw_db_version", default = "1", txn=txn)
345 return int(version)
347 def set_db_version(self, version, txn=None):
348 try:
349 if txn is None:
350 txn = self.begin_transaction()
351 self._db.put("straw_db_version", str(version), txn=txn)
352 except:
353 txn.abort()
354 raise
355 else:
356 txn.commit()
358 def convert_old_versions(self):
359 version = self.get_db_version()
360 while version < self.CURRENT_VERSION:
361 next = version + 1
362 mname = "convert_%d_%d" % (version, next)
363 try:
364 method = getattr(self, mname)
365 except AttributeError:
366 raise ConvertException(version, next, "No conversion function specified")
367 method()
368 self.set_db_version(next)
369 version = next
371 def convert_1_2(self):
372 def is_item(key):
373 parts = key.split(':')
374 if len(parts) != 2:
375 return False
376 return parts[0].isdigit() and parts[1].isdigit()
378 def round_second(ttuple):
379 l = list(ttuple)
380 l[5] = int(round(l[5]))
381 return tuple(l)
383 try:
384 import mx.DateTime as mxd
385 except ImportError:
386 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
387 txn = self.begin_transaction()
388 try:
389 cur = self._db.cursor(txn=txn)
390 try:
391 next = cur.first()
392 key = None
393 if next:
394 key, data = cur.first()
395 while key is not None:
396 if is_item(key):
397 dict = pickle.loads(data)
398 if isinstance(dict['pub_date'], mxd.DateTimeType):
399 p = dict['pub_date']
400 t = time.gmtime(time.mktime(round_second(p.tuple())))
401 dict['pub_date'] = t
402 data = pickle.dumps(dict)
403 cur.put(key, data, DB_CURRENT)
404 next = cur.next()
405 if next:
406 key, data = next
407 else:
408 break
409 finally:
410 cur.close()
411 except Exception, ex:
412 txn.abort()
413 raise
414 else:
415 txn.commit()
417 def convert_2_3(self):
418 def is_item(key):
419 parts = key.split(':')
420 if len(parts) != 2:
421 return False
422 return parts[0].isdigit() and parts[1].isdigit()
424 imagelistcursor = None
425 images = {}
426 txn = self.begin_transaction()
427 try:
428 cur = self._db.cursor(txn=txn)
429 try:
430 next = cur.first()
431 key = None
432 if next:
433 key, data = cur.first()
434 while key is not None:
435 if is_item(key):
436 dic = pickle.loads(data)
437 for image in dic['images']:
438 images[image] = images.get(image, 0) + 1
439 elif key == "images":
440 imagelistcursor = cur.dup(DB_POSITION)
441 next = cur.next()
442 if next:
443 key, data = next
444 else:
445 break
446 for image, count in images.items():
447 key = ("imagecount:" + image).encode('utf-8')
448 cur.put(key, str(count), DB_KEYFIRST)
449 imagelistcursor.put("images", pickle.dumps(images.keys()), DB_CURRENT)
450 finally:
451 cur.close()
452 if imagelistcursor != None:
453 imagelistcursor.close()
454 except Exception, ex:
455 txn.abort()
456 raise
457 else:
458 txn.commit()
460 class ModifyItemsAction:
461 def __init__(self, items):
462 self._items = items
464 def doit(self, db):
465 db.modify_items(self._items)
467 class ItemsAddedAction:
468 def __init__(self, feed, items):
469 self._feed = feed
470 self._items = items
472 def doit(self, db):
473 db.add_items(self._feed, self._items)
475 class DeleteItemsAction:
476 def __init__(self, feed, items):
477 self._feed = feed
478 self._items = items
480 def doit(self, db):
481 db.delete_items(self._feed, self._items)
483 class ImageUpdateAction:
484 def __init__(self, url, image):
485 self._url = url
486 self._image = image
488 def doit(self, db):
489 db.update_image(self._url, self._image)
491 class ImageCountChangedAction:
492 def __init__(self, url, count):
493 self._url = url
494 self._count = count
496 def doit(self, db):
497 db.update_image_count(self._url, self._count)
499 class ItemStore:
500 def __init__(self, dbhome):
501 feedlist = feeds.get_instance()
502 self._db = MyDB(DATABASE_FILE_NAME, dbhome, create = 1)
503 self.connect_signals()
504 feedlist.connect('updated', self._feed_created_cb)
505 feedlist.connect('deleted', self._feed_deleted_cb)
506 ImageCache.cache.connect('image-updated', self.image_updated)
507 self._stop = False
508 self._action_queue = []
510 def _feed_created_cb(self, flist, feed, *args):
511 self._connect_feed_signals(feed)
513 def _feed_deleted_cb(self, flist, feed):
514 # XXX FIX THIS
515 pass
517 def connect_signals(self):
518 flist = feeds.get_instance().flatten_list()
519 for f in flist:
520 self._connect_feed_signals(f)
522 def _connect_feed_signals(self, feed):
523 feed.connect('items-added', self.items_added_cb)
524 feed.connect('items-changed', self.items_changed_cb)
525 feed.connect('items-deleted', self.items_deleted_cb)
527 def items_deleted_cb(self, feed, items):
528 self._action_queue.append(DeleteItemsAction(feed, items))
530 def items_added_cb(self, feed, items):
531 self._action_queue.append(ItemsAddedAction(feed, items))
533 def items_changed_cb(self, feed, items):
534 self._action_queue.append(ModifyItemsAction(items))
536 def image_updated(self, cache, url, data):
537 self._action_queue.append(
538 ImageUpdateAction(url, data))
540 def read_image(self, url):
541 return self._db.get_image_data(url)
543 def read_feed_items(self, feed):
544 return self._db.get_feed_items(feed)
546 def get_number_of_unread(self, feed_id, cutoff):
547 return self._db.get_number_of_unread(feed_id, cutoff)
549 def get_image_counts(self):
550 return self._db.get_image_counts()
552 def set_image_count(self, image, count):
553 self._action_queue.append(
554 ImageCountChangedAction(image, count))
556 def start(self):
557 mlmgr = MainloopManager.get_instance()
558 mlmgr.set_repeating_timer(5000, self._run)
560 def stop(self):
561 mlmgr = MainloopManager.get_instance()
562 mlmgr.end_repeating_timer(self._run)
563 self._db.checkpoint()
564 self._db.close()
565 self._stop = True
567 def _run(self):
568 self._db.checkpoint()
569 freq = 5
570 timer = freq
571 cpfreq = 60
572 cptimer = cpfreq
573 prevtime = time.time()
574 if not self._stop:
575 tmptime = time.time()
576 timer += tmptime - prevtime
577 cptimer += tmptime - prevtime
578 prevtime = tmptime
579 if timer > freq:
580 try:
581 while len(self._action_queue):
582 action = self._action_queue.pop(0)
583 if action is None:
584 break
585 action.doit(self._db)
586 except IndexError, e:
587 pass
588 timer = 0
589 if cptimer > cpfreq:
590 self._db.checkpoint()
591 cptimer = 0
593 itemstore_instance = None
594 def get_instance():
595 global itemstore_instance
596 if itemstore_instance is None:
597 import Config
598 itemstore_instance = ItemStore(Config.straw_home())
599 return itemstore_instance
601 def stringify_item(item):
602 itemdict = {
603 'title': item.title,
604 'link': item.link,
605 'description': item.description,
606 'guid': item.guid,
607 'guidislink': item.guidislink,
608 'pub_date': item.pub_date,
609 'source': item.source,
610 'images': item.image_keys(),
611 'seen': item.seen,
612 'id': item.id,
613 'fm_license': item.fm_license,
614 'fm_changes': item.fm_changes,
615 'creator': item.creator,
616 'contributors': item.contributors,
617 'license_urls': item.license_urls,
618 'publication_name': item.publication_name,
619 'publication_volume': item.publication_volume,
620 'publication_number': item.publication_number,
621 'publication_section': item.publication_section,
622 'publication_starting_page': item.publication_starting_page,
623 'sticky': item._sticky,
624 'enclosures': item.enclosures}
625 return pickle.dumps(itemdict)
627 def unstringify_item(itemstring):
628 if not itemstring:
629 return None
631 idict = _unpickle(itemstring)
632 if not idict:
633 return None
635 item = SummaryItem.SummaryItem()
636 item.title = idict['title']
637 item.link = idict['link']
638 item.description = idict['description']
639 item.guid = idict['guid']
640 item.pub_date = idict['pub_date']
641 item.source = idict['source']
642 for i in idict['images']:
643 item.restore_image(i)
644 item.seen = idict['seen']
645 item.id = idict['id']
646 item.guidislink = idict.get('guidislink', True)
647 item.fm_license = idict.get('fm_license', None)
648 item.fm_changes = idict.get('fm_changes', None)
649 item.creator = idict.get('creator', None)
650 item.contributors = idict.get('contributors', None)
651 item.license_urls = idict.get('license_urls', None)
652 item._sticky = idict.get('sticky', 0)
653 item.enclosures = idict.get('enclosures', None)
654 item.publication_name = idict.get('publication_name', None)
655 item.publication_volume = idict.get('publication_volume', None)
656 item.publication_number = idict.get('publication_number', None)
657 item.publication_section = idict.get('publication_section', None)
658 item.publication_starting_page = idict.get('publication_starting_page', None)
659 return item
661 def _unpickle(istring):
662 itemdict = None
663 try:
664 itemdict = pickle.loads(istring)
665 except ValueError, ve:
666 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring))
667 except Exception, ex:
668 logtb(str(ex))
669 return itemdict
671 if __name__ == '__main__':
672 from pprint import pprint
673 db = MyDB("itemstore.db", "%s/.straw" % os.getenv('HOME'), create = 1)
674 db._db_print(db._data_print)