3 Data store abstraction module.
5 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle
as pickle
28 from bsddb
.db
import *
31 from bsddb3
.db
import *
32 import bsddb3
as bsddb
38 from MainloopManager
import MainloopManager
40 DATABASE_FILE_NAME
= "itemstore.db"
42 class ConvertException(Exception):
43 def __init__(self
, version1
, version2
, reason
):
44 self
.version1
= version1
45 self
.version2
= version2
51 def __init__(self
, filename
, dbhome
, create
=0, truncate
=0, mode
=0600,
52 recover
=0, dbflags
=0):
55 recoverenv
= DB_CREATE | DB_RECOVER
56 # DB_INIT_TXN automatically enables logging
57 flagsforenv
= DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
60 self
._env
.set_data_dir(dbhome
)
61 self
._env
.set_lk_detect(DB_LOCK_DEFAULT
) # enable auto deadlock avoidance
62 self
._env
.set_lg_max(2**20)
63 self
._env
.set_lk_max_locks (10000)
64 self
._env
.set_lk_max_objects (10000)
67 self
._env
.open(dbhome
, recoverenv | flagsforenv
, mode
)
68 except bsddb
._db
.DBRunRecoveryError
, err
:
69 self
._env
.remove(dbhome
)
72 sys
.exit("Recovery Error: See README for details on how to recover data. ")
79 flags |
= DB_AUTO_COMMIT
84 self
._db
= DB(self
._env
)
85 self
._db
.open(filename
, DB_BTREE
, flags
, mode
)
86 except bsddb
._db
.DBNoSuchFileError
:
88 self
._db
= DB(self
._env
)
89 self
._db
.open(filename
, DB_BTREE
, flags | DB_CREATE
, mode
)
90 self
.set_db_version(self
.CURRENT_VERSION
)
94 self
.convert_old_versions()
97 filename
= tempfile
.mktemp(prefix
="straw-")
98 fh
= open(filename
, "w")
99 traceback
.print_exc(None, fh
)
100 raise ConvertException(self
.get_db_version(),
101 MyDB
.CURRENT_VERSION
, "%s" % filename
)
106 if self
._db
is not None:
109 if self
._env
is not None:
113 def checkpoint(self
):
114 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
116 self
._env
.txn_checkpoint(cpflags | DB_FORCE
)
117 deletees
= self
._env
.log_archive(DB_ARCH_ABS
)
121 def begin_transaction(self
):
122 return self
._env
.txn_begin()
124 def get_item_ids(self
, iid
, txn
):
125 key
= "fids:%d" % iid
126 dids
= self
._db
.get(key
, txn
=txn
)
129 ids
= pickle
.loads(dids
)
132 def save_feed_item_ids(self
, feed
, ids
, txn
=None):
133 rowid
= "fids:%d" % feed
.id
136 txn
= self
.begin_transaction()
140 self
._db
.delete(rowid
, txn
=txn
)
141 except DBNotFoundError
:
143 self
._db
.put(rowid
, pickle
.dumps(ids
), txn
=txn
)
144 except Exception, ex
:
152 def get_item(self
, feed_id
, item_id
, txn
=None):
153 item
= self
._db
.get("%d:%d" % (feed_id
, item_id
), txn
=txn
)
154 return unstringify_item(item
)
156 def add_items(self
, feed
, items
):
157 txn
= self
.begin_transaction()
159 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
161 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id), stringify_item(item
), txn
=txn
)
162 # TODO: it might be a good idea to check here that we don't add
163 # duplicate items. It doesn't happen normally, but there can be
164 # bugs that trigger that. Throwing an exception would be the
165 # the right thing: it wouldn't hide the breakage.
166 feed_item_ids
.append(item
.id)
167 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
168 except Exception, ex
:
174 def delete_items(self
, feed
, items
):
175 """ Deletes a list of items.
177 Useful for cutting old items based on number of items stored.
179 txn
= self
.begin_transaction()
181 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
182 # because of bugs, we sometime get here duplicate ids. instead of dying,
183 # warn the user but continue
187 if item
.id in item_ids
:
188 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item
.title
, item
.id))
189 # filter out any duplicates
190 feed_item_ids
= filter(lambda x
: x
!= item
.id, feed_item_ids
)
192 item_ids
.append(item
.id)
193 #log("deleting item %d:%d" % (feed.id, item.id))
194 if item
.id in feed_item_ids
:
195 feed_item_ids
.remove(item
.id)
196 self
._db
.delete("%d:%d" % (feed
.id, item
.id), txn
=txn
)
197 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
198 except Exception, ex
:
200 log_exc("error while deleting items")
204 def modify_items(self
, items
):
205 txn
= self
.begin_transaction()
208 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id),
209 stringify_item(item
), txn
=txn
)
210 except Exception, ex
:
216 def get_feed_items(self
, feed
):
217 txn
= self
.begin_transaction()
220 ids
= self
.get_item_ids(feed
.id, txn
=txn
)
222 item
= self
.get_item(feed
.id, id, txn
=txn
)
225 except Exception, ex
:
233 def get_number_of_unread(self
, fid
, cutoff
):
234 # Used by config conversion
235 # NOTE: this is the number of unread items in 'number of items stored'
236 # preference. Since straw stores the most recent items down the list,
237 # we only count the unread items from the most recent N items,
239 txn
= self
.begin_transaction()
242 ids
= self
.get_item_ids(fid
, txn
=txn
)
243 for id in ids
[len(ids
)-cutoff
:]:
244 item
= self
.get_item(fid
, id, txn
=txn
)
245 if item
is not None and item
.seen
== 0:
248 except Exception, ex
:
255 def get_image_urls(self
, txn
=None):
256 dkeys
= self
._db
.get("images", txn
=txn
)
258 if dkeys
is not None:
259 keys
= pickle
.loads(dkeys
)
262 def save_image_urls(self
, urls
, txn
=None):
263 self
._db
.put("images", pickle
.dumps(urls
), txn
=txn
)
265 def get_image_counts(self
, txn
=None):
266 images
= self
.get_image_urls(txn
)
269 key
= ("imagecount:" + image
).encode('utf-8')
270 value
= self
._db
.get(str(key
))
272 counts
.append((image
, int(value
)))
274 log("exception for ", key
, ", type of value ", value
, ": ", type(value
))
277 def update_image_count(self
, url
, count
):
278 #logparam(locals(), "url", "count")
279 key
= ("imagecount:" + url
).encode('utf-8')
280 txn
= self
.begin_transaction()
283 self
._db
.delete(key
, txn
=txn
)
285 self
._db
.put(key
, str(count
), txn
=txn
)
292 def update_image(self
, url
, image
):
293 key
= "image:%s" % str(url
)
294 txn
= self
.begin_transaction()
296 image_urls
= self
.get_image_urls(txn
)
298 self
._db
.put(key
.encode('utf-8'), image
, txn
=txn
)
299 if url
not in image_urls
:
300 image_urls
.append(url
)
301 self
.save_image_urls(image_urls
, txn
)
303 if url
in image_urls
:
305 self
._db
.delete(key
, txn
=txn
)
306 except DBNotFoundError
:
307 log("Key not found", key
)
308 image_urls
.remove(url
)
309 self
.save_image_urls(image_urls
, txn
=txn
)
316 def get_image_data(self
, url
, txn
=None):
318 "image:%s" % url
.encode('utf-8'), default
= None, txn
=txn
)
320 def _image_print(self
, key
, data
):
321 if key
[:6] == "image:":
324 def _data_print(self
, key
, data
):
325 data
= pickle
.loads(data
)
328 def _db_print(self
, helper
):
329 """Print the database to stdout for debugging"""
330 print "******** Printing raw database for debugging ********"
331 print "database version: %s" % self
.get_db_version()
332 cur
= self
._db
.cursor()
334 key
, data
= cur
.first()
343 def get_db_version(self
, txn
=None):
344 version
= self
._db
.get("straw_db_version", default
= "1", txn
=txn
)
347 def set_db_version(self
, version
, txn
=None):
350 txn
= self
.begin_transaction()
351 self
._db
.put("straw_db_version", str(version
), txn
=txn
)
358 def convert_old_versions(self
):
359 version
= self
.get_db_version()
360 while version
< self
.CURRENT_VERSION
:
362 mname
= "convert_%d_%d" % (version
, next
)
364 method
= getattr(self
, mname
)
365 except AttributeError:
366 raise ConvertException(version
, next
, "No conversion function specified")
368 self
.set_db_version(next
)
371 def convert_1_2(self
):
373 parts
= key
.split(':')
376 return parts
[0].isdigit() and parts
[1].isdigit()
378 def round_second(ttuple
):
380 l
[5] = int(round(l
[5]))
384 import mx
.DateTime
as mxd
386 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
387 txn
= self
.begin_transaction()
389 cur
= self
._db
.cursor(txn
=txn
)
394 key
, data
= cur
.first()
395 while key
is not None:
397 dict = pickle
.loads(data
)
398 if isinstance(dict['pub_date'], mxd
.DateTimeType
):
400 t
= time
.gmtime(time
.mktime(round_second(p
.tuple())))
402 data
= pickle
.dumps(dict)
403 cur
.put(key
, data
, DB_CURRENT
)
411 except Exception, ex
:
417 def convert_2_3(self
):
419 parts
= key
.split(':')
422 return parts
[0].isdigit() and parts
[1].isdigit()
424 imagelistcursor
= None
426 txn
= self
.begin_transaction()
428 cur
= self
._db
.cursor(txn
=txn
)
433 key
, data
= cur
.first()
434 while key
is not None:
436 dic
= pickle
.loads(data
)
437 for image
in dic
['images']:
438 images
[image
] = images
.get(image
, 0) + 1
439 elif key
== "images":
440 imagelistcursor
= cur
.dup(DB_POSITION
)
446 for image
, count
in images
.items():
447 key
= ("imagecount:" + image
).encode('utf-8')
448 cur
.put(key
, str(count
), DB_KEYFIRST
)
449 imagelistcursor
.put("images", pickle
.dumps(images
.keys()), DB_CURRENT
)
452 if imagelistcursor
!= None:
453 imagelistcursor
.close()
454 except Exception, ex
:
460 class ModifyItemsAction
:
461 def __init__(self
, items
):
465 db
.modify_items(self
._items
)
467 class ItemsAddedAction
:
468 def __init__(self
, feed
, items
):
473 db
.add_items(self
._feed
, self
._items
)
475 class DeleteItemsAction
:
476 def __init__(self
, feed
, items
):
481 db
.delete_items(self
._feed
, self
._items
)
483 class ImageUpdateAction
:
484 def __init__(self
, url
, image
):
489 db
.update_image(self
._url
, self
._image
)
491 class ImageCountChangedAction
:
492 def __init__(self
, url
, count
):
497 db
.update_image_count(self
._url
, self
._count
)
500 def __init__(self
, dbhome
):
501 self
._db
= MyDB(DATABASE_FILE_NAME
, dbhome
, create
= 1)
503 self
._action
_queue
= []
504 self
.connect_signals()
505 ImageCache
.cache
.connect('image-updated', self
.image_updated
)
506 c
= feeds
.category_list
.all_category
507 c
.connect('feed-added', self
._feed
_created
_cb
)
508 c
.connect('feed-removed', self
._feed
_deleted
_cb
)
510 def _feed_created_cb(self
, category
, feed
, *args
):
511 self
._connect
_feed
_signals
(feed
)
513 def _feed_deleted_cb(self
, category
, feed
):
514 self
.items_deleted_cb(feed
, feed
.items
)
516 def connect_signals(self
):
517 flist
= feeds
.feedlist
.flatten_list()
519 self
._connect
_feed
_signals
(f
)
521 def _connect_feed_signals(self
, feed
):
522 feed
.connect('items-added', self
.items_added_cb
)
523 feed
.connect('items-changed', self
.items_changed_cb
)
524 feed
.connect('items-deleted', self
.items_deleted_cb
)
526 def items_deleted_cb(self
, feed
, items
):
527 self
._action
_queue
.append(DeleteItemsAction(feed
, items
))
529 def items_added_cb(self
, feed
, items
):
530 self
._action
_queue
.append(ItemsAddedAction(feed
, items
))
532 def items_changed_cb(self
, feed
, items
):
533 self
._action
_queue
.append(ModifyItemsAction(items
))
535 def image_updated(self
, cache
, url
, data
):
536 self
._action
_queue
.append(
537 ImageUpdateAction(url
, data
))
539 def read_image(self
, url
):
540 return self
._db
.get_image_data(url
)
542 def read_feed_items(self
, feed
):
543 return self
._db
.get_feed_items(feed
)
545 def get_number_of_unread(self
, feed_id
, cutoff
):
546 return self
._db
.get_number_of_unread(feed_id
, cutoff
)
548 def get_image_counts(self
):
549 return self
._db
.get_image_counts()
551 def set_image_count(self
, image
, count
):
552 self
._action
_queue
.append(
553 ImageCountChangedAction(image
, count
))
556 mlmgr
= MainloopManager
.get_instance()
557 mlmgr
.set_repeating_timer(5000, self
._run
)
560 mlmgr
= MainloopManager
.get_instance()
561 mlmgr
.end_repeating_timer(self
._run
)
562 self
._db
.checkpoint()
567 self
._db
.checkpoint()
572 prevtime
= time
.time()
574 tmptime
= time
.time()
575 timer
+= tmptime
- prevtime
576 cptimer
+= tmptime
- prevtime
580 while len(self
._action
_queue
):
581 action
= self
._action
_queue
.pop(0)
584 action
.doit(self
._db
)
585 except IndexError, e
:
589 self
._db
.checkpoint()
592 itemstore_instance
= None
594 global itemstore_instance
595 if itemstore_instance
is None:
597 itemstore_instance
= ItemStore(Config
.straw_home())
598 return itemstore_instance
600 def stringify_item(item
):
604 'description': item
.description
,
606 'guidislink': item
.guidislink
,
607 'pub_date': item
.pub_date
,
608 'source': item
.source
,
609 'images': item
.image_keys(),
612 'fm_license': item
.fm_license
,
613 'fm_changes': item
.fm_changes
,
614 'creator': item
.creator
,
615 'contributors': item
.contributors
,
616 'license_urls': item
.license_urls
,
617 'publication_name': item
.publication_name
,
618 'publication_volume': item
.publication_volume
,
619 'publication_number': item
.publication_number
,
620 'publication_section': item
.publication_section
,
621 'publication_starting_page': item
.publication_starting_page
,
622 'sticky': item
._sticky
,
623 'enclosures': item
.enclosures
}
624 return pickle
.dumps(itemdict
)
626 def unstringify_item(itemstring
):
629 idict
= _unpickle(itemstring
)
633 item
= SummaryItem
.SummaryItem()
634 item
.title
= idict
['title']
635 item
.link
= idict
['link']
636 item
.description
= idict
['description']
637 item
.guid
= idict
['guid']
638 item
.pub_date
= idict
['pub_date']
639 item
.source
= idict
['source']
640 for i
in idict
['images']:
641 item
.restore_image(i
)
642 item
.seen
= idict
['seen']
643 item
.id = idict
['id']
644 item
.guidislink
= idict
.get('guidislink', True)
645 item
.fm_license
= idict
.get('fm_license', None)
646 item
.fm_changes
= idict
.get('fm_changes', None)
647 item
.creator
= idict
.get('creator', None)
648 item
.contributors
= idict
.get('contributors', None)
649 item
.license_urls
= idict
.get('license_urls', None)
650 item
._sticky
= idict
.get('sticky', 0)
651 item
.enclosures
= idict
.get('enclosures', None)
652 item
.publication_name
= idict
.get('publication_name', None)
653 item
.publication_volume
= idict
.get('publication_volume', None)
654 item
.publication_number
= idict
.get('publication_number', None)
655 item
.publication_section
= idict
.get('publication_section', None)
656 item
.publication_starting_page
= idict
.get('publication_starting_page', None)
659 def _unpickle(istring
):
662 itemdict
= pickle
.loads(istring
)
663 except ValueError, ve
:
664 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring
))
665 except Exception, ex
:
669 if __name__
== '__main__':
670 from pprint
import pprint
671 db
= MyDB("itemstore.db", "%s/.straw" % os
.getenv('HOME'), create
= 1)
672 db
._db
_print
(db
._data
_print
)