Make treeview refresh properly when adding nodes.
[straw/fork.git] / straw / old_ItemStore.py
blob62632d36d998a5f31c712697a162a6d55b677272
1 """ ItemStore.py
3 Data store abstraction module.
4 """
5 __copyright__ = "Copyright (c) 2002-2005 Free Software Foundation, Inc."
6 __license__ = """
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
10 version.
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle as pickle
21 import os, sys
22 import time
23 from error import *
24 import tempfile
25 import traceback
27 try:
28 from bsddb.db import *
29 import bsddb
30 except ImportError:
31 from bsddb3.db import *
32 import bsddb3 as bsddb
34 import Event
35 import SummaryItem
36 import feeds
37 from MainloopManager import MainloopManager
39 DATABASE_FILE_NAME = "itemstore.db"
41 class ConvertException(Exception):
42 def __init__(self, version1, version2, reason):
43 self.version1 = version1
44 self.version2 = version2
45 self.reason = reason
47 class MyDB:
48 CURRENT_VERSION = 3
50 def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600,
51 recover=0, dbflags=0):
52 self._db = None
53 self._env = None
54 recoverenv = DB_CREATE | DB_RECOVER
55 # DB_INIT_TXN automatically enables logging
56 flagsforenv = DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
58 self._env = DBEnv()
59 self._env.set_data_dir(dbhome)
60 self._env.set_lk_detect(DB_LOCK_DEFAULT) # enable auto deadlock avoidance
61 self._env.set_lg_max(2**20)
62 self._env.set_lk_max_locks (10000)
63 self._env.set_lk_max_objects (10000)
65 try:
66 self._env.open(dbhome, recoverenv | flagsforenv, mode)
67 except bsddb._db.DBRunRecoveryError, err:
68 self._env.remove(dbhome)
69 self._env.close()
70 log("%s" % err[1])
71 sys.exit("Recovery Error: See README for details on how to recover data. ")
73 flags = 0
74 if truncate:
75 flags |= DB_TRUNCATE
77 try:
78 flags |= DB_AUTO_COMMIT
79 except NameError:
80 pass
82 try:
83 self._db = DB(self._env)
84 self._db.open(filename, DB_BTREE, flags, mode)
85 except bsddb._db.DBNoSuchFileError:
86 if create:
87 self._db = DB(self._env)
88 self._db.open(filename, DB_BTREE, flags | DB_CREATE, mode)
89 self.set_db_version(self.CURRENT_VERSION)
90 else:
91 raise
92 try:
93 self.convert_old_versions()
94 except Exception, ex:
95 try:
96 filename = tempfile.mktemp(prefix="straw-")
97 fh = open(filename, "w")
98 traceback.print_exc(None, fh)
99 raise ConvertException(self.get_db_version(),
100 MyDB.CURRENT_VERSION, "%s" % filename)
101 finally:
102 fh.close()
104 def close(self):
105 if self._db is not None:
106 self._db.close()
107 self._db = None
108 if self._env is not None:
109 self._env.close()
110 self._env = None
112 def checkpoint(self):
113 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
114 cpflags = 0
115 self._env.txn_checkpoint(cpflags | DB_FORCE )
116 deletees = self._env.log_archive(DB_ARCH_ABS)
117 for d in deletees:
118 os.remove(d)
120 def begin_transaction(self):
121 return self._env.txn_begin()
123 def get_item_ids(self, iid, txn):
124 key = "fids:%d" % iid
125 dids = self._db.get(key, txn=txn)
126 ids = []
127 if dids:
128 ids = pickle.loads(dids)
129 return ids
131 def save_feed_item_ids(self, feed, ids, txn=None):
132 rowid = "fids:%d" % feed.id
133 commit = 0
134 if not txn:
135 txn = self.begin_transaction()
136 commit = 1
137 try:
138 try:
139 self._db.delete(rowid, txn=txn)
140 except DBNotFoundError:
141 pass
142 self._db.put(rowid, pickle.dumps(ids), txn=txn)
143 except Exception, ex:
144 if commit:
145 txn.abort()
146 logtb(str(ex))
147 else:
148 if commit:
149 txn.commit()
151 def get_item(self, feed_id, item_id, txn=None):
152 item = self._db.get("%d:%d" % (feed_id, item_id), txn=txn)
153 return unstringify_item(item)
155 def add_items(self, feed, items):
156 txn = self.begin_transaction()
157 try:
158 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
159 for item in items:
160 self._db.put("%d:%d" % (item.feed.id, item.id), stringify_item(item), txn=txn)
161 # TODO: it might be a good idea to check here that we don't add
162 # duplicate items. It doesn't happen normally, but there can be
163 # bugs that trigger that. Throwing an exception would be the
164 # the right thing: it wouldn't hide the breakage.
165 feed_item_ids.append(item.id)
166 self.save_feed_item_ids(feed, feed_item_ids, txn)
167 except Exception, ex:
168 txn.abort()
169 logtb(str(ex))
170 else:
171 txn.commit()
173 def delete_items(self, feed, items):
174 """ Deletes a list of items.
176 Useful for cutting old items based on number of items stored.
178 txn = self.begin_transaction()
179 try:
180 feed_item_ids = self.get_item_ids(feed.id, txn=txn)
181 # because of bugs, we sometime get here duplicate ids. instead of dying,
182 # warn the user but continue
183 item_ids = []
184 for item in items:
185 item.clean_up()
186 if item.id in item_ids:
187 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item.title, item.id))
188 # filter out any duplicates
189 feed_item_ids = filter(lambda x: x != item.id, feed_item_ids)
190 continue
191 item_ids.append(item.id)
192 #log("deleting item %d:%d" % (feed.id, item.id))
193 if item.id in feed_item_ids:
194 feed_item_ids.remove(item.id)
195 self._db.delete("%d:%d" % (feed.id, item.id), txn=txn)
196 self.save_feed_item_ids(feed, feed_item_ids, txn)
197 except Exception, ex:
198 txn.abort()
199 log_exc("error while deleting items")
200 else:
201 txn.commit()
203 def modify_items(self, items):
204 txn = self.begin_transaction()
205 try:
206 for item in items:
207 self._db.put("%d:%d" % (item.feed.id, item.id),
208 stringify_item(item), txn=txn)
209 except Exception, ex:
210 txn.abort()
211 logtb(str(ex))
212 else:
213 txn.commit()
215 def get_feed_items(self, feed):
216 txn = self.begin_transaction()
217 items = []
218 try:
219 ids = self.get_item_ids(feed.id, txn=txn)
220 for id in ids:
221 item = self.get_item(feed.id, id, txn=txn)
222 if item is not None:
223 items.append(item)
224 except Exception, ex:
225 txn.abort()
226 raise
227 #XXX log(str(ex))
228 else:
229 txn.commit()
230 return items
232 def get_number_of_unread(self, fid, cutoff):
233 # Used by config conversion
234 # NOTE: this is the number of unread items in 'number of items stored'
235 # preference. Since straw stores the most recent items down the list,
236 # we only count the unread items from the most recent N items,
237 # where N = cutoff.
238 txn = self.begin_transaction()
239 num_unread = 0
240 try:
241 ids = self.get_item_ids(fid, txn=txn)
242 for id in ids[len(ids)-cutoff:]:
243 item = self.get_item(fid, id, txn=txn)
244 if item is not None and item.seen == 0:
245 num_unread += 1
246 else: continue
247 except Exception, ex:
248 txn.abort()
249 logtb(str(ex))
250 else:
251 txn.commit()
252 return num_unread
254 def get_image_urls(self, txn=None):
255 dkeys = self._db.get("images", txn=txn)
256 keys = []
257 if dkeys is not None:
258 keys = pickle.loads(dkeys)
259 return keys
261 def save_image_urls(self, urls, txn=None):
262 self._db.put("images", pickle.dumps(urls), txn=txn)
264 def get_image_counts(self, txn=None):
265 images = self.get_image_urls(txn)
266 counts = []
267 for image in images:
268 key = ("imagecount:" + image).encode('utf-8')
269 value = self._db.get(str(key))
270 try:
271 counts.append((image, int(value)))
272 except:
273 log("exception for ", key, ", type of value ", value, ": ", type(value))
274 return counts
276 def update_image_count(self, url, count):
277 #logparam(locals(), "url", "count")
278 key = ("imagecount:" + url).encode('utf-8')
279 txn = self.begin_transaction()
280 try:
281 if count < 1:
282 self._db.delete(key, txn=txn)
283 else:
284 self._db.put(key, str(count), txn=txn)
285 except:
286 txn.abort()
287 raise
288 else:
289 txn.commit()
291 def update_image(self, url, image):
292 key = "image:%s" % str(url)
293 txn = self.begin_transaction()
294 try:
295 image_urls = self.get_image_urls(txn)
296 if image:
297 self._db.put(key.encode('utf-8'), image, txn=txn)
298 if url not in image_urls:
299 image_urls.append(url)
300 self.save_image_urls(image_urls, txn)
301 else:
302 if url in image_urls:
303 try:
304 self._db.delete(key, txn=txn)
305 except DBNotFoundError:
306 log("Key not found", key)
307 image_urls.remove(url)
308 self.save_image_urls(image_urls, txn=txn)
309 except:
310 txn.abort()
311 raise
312 else:
313 txn.commit()
315 def get_image_data(self, url, txn=None):
316 return self._db.get(
317 "image:%s" % url.encode('utf-8'), default = None, txn=txn)
319 def _image_print(self, key, data):
320 if key[:6] == "image:":
321 print key
323 def _data_print(self, key, data):
324 data = pickle.loads(data)
325 pprint ({key: data})
327 def _db_print(self, helper):
328 """Print the database to stdout for debugging"""
329 print "******** Printing raw database for debugging ********"
330 print "database version: %s" % self.get_db_version()
331 cur = self._db.cursor()
332 try:
333 key, data = cur.first()
334 while 1 :
335 helper(key, data)
336 next = cur.next()
337 if next:
338 key, data = next
339 finally:
340 cur.close()
342 def get_db_version(self, txn=None):
343 version = self._db.get("straw_db_version", default = "1", txn=txn)
344 return int(version)
346 def set_db_version(self, version, txn=None):
347 try:
348 if txn is None:
349 txn = self.begin_transaction()
350 self._db.put("straw_db_version", str(version), txn=txn)
351 except:
352 txn.abort()
353 raise
354 else:
355 txn.commit()
357 def convert_old_versions(self):
358 version = self.get_db_version()
359 while version < self.CURRENT_VERSION:
360 next = version + 1
361 mname = "convert_%d_%d" % (version, next)
362 try:
363 method = getattr(self, mname)
364 except AttributeError:
365 raise ConvertException(version, next, "No conversion function specified")
366 method()
367 self.set_db_version(next)
368 version = next
370 def convert_1_2(self):
371 def is_item(key):
372 parts = key.split(':')
373 if len(parts) != 2:
374 return False
375 return parts[0].isdigit() and parts[1].isdigit()
377 def round_second(ttuple):
378 l = list(ttuple)
379 l[5] = int(round(l[5]))
380 return tuple(l)
382 try:
383 import mx.DateTime as mxd
384 except ImportError:
385 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
386 txn = self.begin_transaction()
387 try:
388 cur = self._db.cursor(txn=txn)
389 try:
390 next = cur.first()
391 key = None
392 if next:
393 key, data = cur.first()
394 while key is not None:
395 if is_item(key):
396 dict = pickle.loads(data)
397 if isinstance(dict['pub_date'], mxd.DateTimeType):
398 p = dict['pub_date']
399 t = time.gmtime(time.mktime(round_second(p.tuple())))
400 dict['pub_date'] = t
401 data = pickle.dumps(dict)
402 cur.put(key, data, DB_CURRENT)
403 next = cur.next()
404 if next:
405 key, data = next
406 else:
407 break
408 finally:
409 cur.close()
410 except Exception, ex:
411 txn.abort()
412 raise
413 else:
414 txn.commit()
416 def convert_2_3(self):
417 def is_item(key):
418 parts = key.split(':')
419 if len(parts) != 2:
420 return False
421 return parts[0].isdigit() and parts[1].isdigit()
423 imagelistcursor = None
424 images = {}
425 txn = self.begin_transaction()
426 try:
427 cur = self._db.cursor(txn=txn)
428 try:
429 next = cur.first()
430 key = None
431 if next:
432 key, data = cur.first()
433 while key is not None:
434 if is_item(key):
435 dic = pickle.loads(data)
436 for image in dic['images']:
437 images[image] = images.get(image, 0) + 1
438 elif key == "images":
439 imagelistcursor = cur.dup(DB_POSITION)
440 next = cur.next()
441 if next:
442 key, data = next
443 else:
444 break
445 for image, count in images.items():
446 key = ("imagecount:" + image).encode('utf-8')
447 cur.put(key, str(count), DB_KEYFIRST)
448 imagelistcursor.put("images", pickle.dumps(images.keys()), DB_CURRENT)
449 finally:
450 cur.close()
451 if imagelistcursor != None:
452 imagelistcursor.close()
453 except Exception, ex:
454 txn.abort()
455 raise
456 else:
457 txn.commit()
459 class ModifyItemsAction:
460 def __init__(self, items):
461 self._items = items
463 def doit(self, db):
464 db.modify_items(self._items)
466 class ItemsAddedAction:
467 def __init__(self, feed, items):
468 self._feed = feed
469 self._items = items
471 def doit(self, db):
472 db.add_items(self._feed, self._items)
474 class DeleteItemsAction:
475 def __init__(self, feed, items):
476 self._feed = feed
477 self._items = items
479 def doit(self, db):
480 db.delete_items(self._feed, self._items)
482 class ImageUpdateAction:
483 def __init__(self, url, image):
484 self._url = url
485 self._image = image
487 def doit(self, db):
488 db.update_image(self._url, self._image)
490 class ImageCountChangedAction:
491 def __init__(self, url, count):
492 self._url = url
493 self._count = count
495 def doit(self, db):
496 db.update_image_count(self._url, self._count)
498 class ItemStore:
499 def __init__(self, dbhome):
500 self._db = MyDB(DATABASE_FILE_NAME, dbhome, create = 1)
501 self._stop = False
502 self._action_queue = []
503 self.connect_signals()
504 c = feeds.category_list.all_category
505 c.connect('feed-added', self._feed_created_cb)
506 c.connect('feed-removed', self._feed_deleted_cb)
508 def _feed_created_cb(self, category, feed, *args):
509 self._connect_feed_signals(feed)
511 def _feed_deleted_cb(self, category, feed):
512 self.items_deleted_cb(feed, feed.items)
514 def connect_signals(self):
515 flist = feeds.feedlist.flatten_list()
516 for f in flist:
517 self._connect_feed_signals(f)
519 def _connect_feed_signals(self, feed):
520 feed.connect('items-added', self.items_added_cb)
521 feed.connect('items-changed', self.items_changed_cb)
522 feed.connect('items-deleted', self.items_deleted_cb)
524 def items_deleted_cb(self, feed, items):
525 self._action_queue.append(DeleteItemsAction(feed, items))
527 def items_added_cb(self, feed, items):
528 self._action_queue.append(ItemsAddedAction(feed, items))
530 def items_changed_cb(self, feed, items):
531 self._action_queue.append(ModifyItemsAction(items))
533 def image_updated(self, cache, url, data):
534 self.update_image(url, data)
536 def update_image(self, url, data):
537 self._action_queue.append(
538 ImageUpdateAction(url, data))
540 def read_image(self, url):
541 return self._db.get_image_data(url)
543 def read_feed_items(self, feed):
544 return self._db.get_feed_items(feed)
546 def get_number_of_unread(self, feed_id, cutoff):
547 return self._db.get_number_of_unread(feed_id, cutoff)
549 def get_image_counts(self):
550 return self._db.get_image_counts()
552 def set_image_count(self, image, count):
553 self._action_queue.append(
554 ImageCountChangedAction(image, count))
556 def start(self):
557 mlmgr = MainloopManager.get_instance()
558 mlmgr.set_repeating_timer(5000, self._run)
560 def stop(self):
561 mlmgr = MainloopManager.get_instance()
562 mlmgr.end_repeating_timer(self._run)
563 self._db.checkpoint()
564 self._db.close()
565 self._stop = True
567 def _run(self):
568 self._db.checkpoint()
569 freq = 5
570 timer = freq
571 cpfreq = 60
572 cptimer = cpfreq
573 prevtime = time.time()
574 if not self._stop:
575 tmptime = time.time()
576 timer += tmptime - prevtime
577 cptimer += tmptime - prevtime
578 prevtime = tmptime
579 if timer > freq:
580 try:
581 while len(self._action_queue):
582 action = self._action_queue.pop(0)
583 if action is None:
584 break
585 action.doit(self._db)
586 except IndexError, e:
587 pass
588 timer = 0
589 if cptimer > cpfreq:
590 self._db.checkpoint()
591 cptimer = 0
593 itemstore_instance = None
594 def get_instance():
595 global itemstore_instance
596 if itemstore_instance is None:
597 import Config
598 itemstore_instance = ItemStore(Config.straw_home())
599 return itemstore_instance
601 def stringify_item(item):
602 itemdict = {
603 'title': item.title,
604 'link': item.link,
605 'description': item.description,
606 'guid': item.guid,
607 'guidislink': item.guidislink,
608 'pub_date': item.pub_date,
609 'source': item.source,
610 'images': item.image_keys(),
611 'seen': item.seen,
612 'id': item.id,
613 'fm_license': item.fm_license,
614 'fm_changes': item.fm_changes,
615 'creator': item.creator,
616 'contributors': item.contributors,
617 'license_urls': item.license_urls,
618 'publication_name': item.publication_name,
619 'publication_volume': item.publication_volume,
620 'publication_number': item.publication_number,
621 'publication_section': item.publication_section,
622 'publication_starting_page': item.publication_starting_page,
623 'sticky': item._sticky,
624 'enclosures': item.enclosures}
625 return pickle.dumps(itemdict)
627 def unstringify_item(itemstring):
628 if not itemstring:
629 return None
630 idict = _unpickle(itemstring)
631 if not idict:
632 return None
634 item = SummaryItem.SummaryItem()
635 item.title = idict['title']
636 item.link = idict['link']
637 item.description = idict['description']
638 item.guid = idict['guid']
639 item.pub_date = idict['pub_date']
640 item.source = idict['source']
641 for i in idict['images']:
642 item.restore_image(i)
643 item.seen = idict['seen']
644 item.id = idict['id']
645 item.guidislink = idict.get('guidislink', True)
646 item.fm_license = idict.get('fm_license', None)
647 item.fm_changes = idict.get('fm_changes', None)
648 item.creator = idict.get('creator', None)
649 item.contributors = idict.get('contributors', None)
650 item.license_urls = idict.get('license_urls', None)
651 item._sticky = idict.get('sticky', 0)
652 item.enclosures = idict.get('enclosures', None)
653 item.publication_name = idict.get('publication_name', None)
654 item.publication_volume = idict.get('publication_volume', None)
655 item.publication_number = idict.get('publication_number', None)
656 item.publication_section = idict.get('publication_section', None)
657 item.publication_starting_page = idict.get('publication_starting_page', None)
658 return item
660 def _unpickle(istring):
661 itemdict = None
662 try:
663 itemdict = pickle.loads(istring)
664 except ValueError, ve:
665 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring))
666 except Exception, ex:
667 logtb(str(ex))
668 return itemdict
670 if __name__ == '__main__':
671 from pprint import pprint
672 db = MyDB("itemstore.db", "%s/.straw" % os.getenv('HOME'), create = 1)
673 db._db_print(db._data_print)