3 Data store abstraction module.
5 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle
as pickle
28 from bsddb
.db
import *
31 from bsddb3
.db
import *
32 import bsddb3
as bsddb
37 from MainloopManager
import MainloopManager
39 DATABASE_FILE_NAME
= "itemstore.db"
41 class ConvertException(Exception):
42 def __init__(self
, version1
, version2
, reason
):
43 self
.version1
= version1
44 self
.version2
= version2
50 def __init__(self
, filename
, dbhome
, create
=0, truncate
=0, mode
=0600,
51 recover
=0, dbflags
=0):
54 recoverenv
= DB_CREATE | DB_RECOVER
55 # DB_INIT_TXN automatically enables logging
56 flagsforenv
= DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
59 self
._env
.set_data_dir(dbhome
)
60 self
._env
.set_lk_detect(DB_LOCK_DEFAULT
) # enable auto deadlock avoidance
61 self
._env
.set_lg_max(2**20)
62 self
._env
.set_lk_max_locks (10000)
63 self
._env
.set_lk_max_objects (10000)
66 self
._env
.open(dbhome
, recoverenv | flagsforenv
, mode
)
67 except bsddb
._db
.DBRunRecoveryError
, err
:
68 self
._env
.remove(dbhome
)
71 sys
.exit("Recovery Error: See README for details on how to recover data. ")
78 flags |
= DB_AUTO_COMMIT
83 self
._db
= DB(self
._env
)
84 self
._db
.open(filename
, DB_BTREE
, flags
, mode
)
85 except bsddb
._db
.DBNoSuchFileError
:
87 self
._db
= DB(self
._env
)
88 self
._db
.open(filename
, DB_BTREE
, flags | DB_CREATE
, mode
)
89 self
.set_db_version(self
.CURRENT_VERSION
)
93 self
.convert_old_versions()
96 filename
= tempfile
.mktemp(prefix
="straw-")
97 fh
= open(filename
, "w")
98 traceback
.print_exc(None, fh
)
99 raise ConvertException(self
.get_db_version(),
100 MyDB
.CURRENT_VERSION
, "%s" % filename
)
105 if self
._db
is not None:
108 if self
._env
is not None:
112 def checkpoint(self
):
113 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
115 self
._env
.txn_checkpoint(cpflags | DB_FORCE
)
116 deletees
= self
._env
.log_archive(DB_ARCH_ABS
)
120 def begin_transaction(self
):
121 return self
._env
.txn_begin()
123 def get_item_ids(self
, iid
, txn
):
124 key
= "fids:%d" % iid
125 dids
= self
._db
.get(key
, txn
=txn
)
128 ids
= pickle
.loads(dids
)
131 def save_feed_item_ids(self
, feed
, ids
, txn
=None):
132 rowid
= "fids:%d" % feed
.id
135 txn
= self
.begin_transaction()
139 self
._db
.delete(rowid
, txn
=txn
)
140 except DBNotFoundError
:
142 self
._db
.put(rowid
, pickle
.dumps(ids
), txn
=txn
)
143 except Exception, ex
:
151 def get_item(self
, feed_id
, item_id
, txn
=None):
152 item
= self
._db
.get("%d:%d" % (feed_id
, item_id
), txn
=txn
)
153 return unstringify_item(item
)
155 def add_items(self
, feed
, items
):
156 txn
= self
.begin_transaction()
158 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
160 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id), stringify_item(item
), txn
=txn
)
161 # TODO: it might be a good idea to check here that we don't add
162 # duplicate items. It doesn't happen normally, but there can be
163 # bugs that trigger that. Throwing an exception would be the
164 # the right thing: it wouldn't hide the breakage.
165 feed_item_ids
.append(item
.id)
166 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
167 except Exception, ex
:
173 def delete_items(self
, feed
, items
):
174 """ Deletes a list of items.
176 Useful for cutting old items based on number of items stored.
178 txn
= self
.begin_transaction()
180 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
181 # because of bugs, we sometime get here duplicate ids. instead of dying,
182 # warn the user but continue
186 if item
.id in item_ids
:
187 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item
.title
, item
.id))
188 # filter out any duplicates
189 feed_item_ids
= filter(lambda x
: x
!= item
.id, feed_item_ids
)
191 item_ids
.append(item
.id)
192 #log("deleting item %d:%d" % (feed.id, item.id))
193 if item
.id in feed_item_ids
:
194 feed_item_ids
.remove(item
.id)
195 self
._db
.delete("%d:%d" % (feed
.id, item
.id), txn
=txn
)
196 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
197 except Exception, ex
:
199 log_exc("error while deleting items")
203 def modify_items(self
, items
):
204 txn
= self
.begin_transaction()
207 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id),
208 stringify_item(item
), txn
=txn
)
209 except Exception, ex
:
215 def get_feed_items(self
, feed
):
216 txn
= self
.begin_transaction()
219 ids
= self
.get_item_ids(feed
.id, txn
=txn
)
221 item
= self
.get_item(feed
.id, id, txn
=txn
)
224 except Exception, ex
:
232 def get_number_of_unread(self
, fid
, cutoff
):
233 # Used by config conversion
234 # NOTE: this is the number of unread items in 'number of items stored'
235 # preference. Since straw stores the most recent items down the list,
236 # we only count the unread items from the most recent N items,
238 txn
= self
.begin_transaction()
241 ids
= self
.get_item_ids(fid
, txn
=txn
)
242 for id in ids
[len(ids
)-cutoff
:]:
243 item
= self
.get_item(fid
, id, txn
=txn
)
244 if item
is not None and item
.seen
== 0:
247 except Exception, ex
:
254 def get_image_urls(self
, txn
=None):
255 dkeys
= self
._db
.get("images", txn
=txn
)
257 if dkeys
is not None:
258 keys
= pickle
.loads(dkeys
)
261 def save_image_urls(self
, urls
, txn
=None):
262 self
._db
.put("images", pickle
.dumps(urls
), txn
=txn
)
264 def get_image_counts(self
, txn
=None):
265 images
= self
.get_image_urls(txn
)
268 key
= ("imagecount:" + image
).encode('utf-8')
269 value
= self
._db
.get(str(key
))
271 counts
.append((image
, int(value
)))
273 log("exception for ", key
, ", type of value ", value
, ": ", type(value
))
276 def update_image_count(self
, url
, count
):
277 #logparam(locals(), "url", "count")
278 key
= ("imagecount:" + url
).encode('utf-8')
279 txn
= self
.begin_transaction()
282 self
._db
.delete(key
, txn
=txn
)
284 self
._db
.put(key
, str(count
), txn
=txn
)
291 def update_image(self
, url
, image
):
292 key
= "image:%s" % str(url
)
293 txn
= self
.begin_transaction()
295 image_urls
= self
.get_image_urls(txn
)
297 self
._db
.put(key
.encode('utf-8'), image
, txn
=txn
)
298 if url
not in image_urls
:
299 image_urls
.append(url
)
300 self
.save_image_urls(image_urls
, txn
)
302 if url
in image_urls
:
304 self
._db
.delete(key
, txn
=txn
)
305 except DBNotFoundError
:
306 log("Key not found", key
)
307 image_urls
.remove(url
)
308 self
.save_image_urls(image_urls
, txn
=txn
)
315 def get_image_data(self
, url
, txn
=None):
317 "image:%s" % url
.encode('utf-8'), default
= None, txn
=txn
)
319 def _image_print(self
, key
, data
):
320 if key
[:6] == "image:":
323 def _data_print(self
, key
, data
):
324 data
= pickle
.loads(data
)
327 def _db_print(self
, helper
):
328 """Print the database to stdout for debugging"""
329 print "******** Printing raw database for debugging ********"
330 print "database version: %s" % self
.get_db_version()
331 cur
= self
._db
.cursor()
333 key
, data
= cur
.first()
342 def get_db_version(self
, txn
=None):
343 version
= self
._db
.get("straw_db_version", default
= "1", txn
=txn
)
346 def set_db_version(self
, version
, txn
=None):
349 txn
= self
.begin_transaction()
350 self
._db
.put("straw_db_version", str(version
), txn
=txn
)
357 def convert_old_versions(self
):
358 version
= self
.get_db_version()
359 while version
< self
.CURRENT_VERSION
:
361 mname
= "convert_%d_%d" % (version
, next
)
363 method
= getattr(self
, mname
)
364 except AttributeError:
365 raise ConvertException(version
, next
, "No conversion function specified")
367 self
.set_db_version(next
)
370 def convert_1_2(self
):
372 parts
= key
.split(':')
375 return parts
[0].isdigit() and parts
[1].isdigit()
377 def round_second(ttuple
):
379 l
[5] = int(round(l
[5]))
383 import mx
.DateTime
as mxd
385 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
386 txn
= self
.begin_transaction()
388 cur
= self
._db
.cursor(txn
=txn
)
393 key
, data
= cur
.first()
394 while key
is not None:
396 dict = pickle
.loads(data
)
397 if isinstance(dict['pub_date'], mxd
.DateTimeType
):
399 t
= time
.gmtime(time
.mktime(round_second(p
.tuple())))
401 data
= pickle
.dumps(dict)
402 cur
.put(key
, data
, DB_CURRENT
)
410 except Exception, ex
:
416 def convert_2_3(self
):
418 parts
= key
.split(':')
421 return parts
[0].isdigit() and parts
[1].isdigit()
423 imagelistcursor
= None
425 txn
= self
.begin_transaction()
427 cur
= self
._db
.cursor(txn
=txn
)
432 key
, data
= cur
.first()
433 while key
is not None:
435 dic
= pickle
.loads(data
)
436 for image
in dic
['images']:
437 images
[image
] = images
.get(image
, 0) + 1
438 elif key
== "images":
439 imagelistcursor
= cur
.dup(DB_POSITION
)
445 for image
, count
in images
.items():
446 key
= ("imagecount:" + image
).encode('utf-8')
447 cur
.put(key
, str(count
), DB_KEYFIRST
)
448 imagelistcursor
.put("images", pickle
.dumps(images
.keys()), DB_CURRENT
)
451 if imagelistcursor
!= None:
452 imagelistcursor
.close()
453 except Exception, ex
:
459 class ModifyItemsAction
:
460 def __init__(self
, items
):
464 db
.modify_items(self
._items
)
466 class ItemsAddedAction
:
467 def __init__(self
, feed
, items
):
472 db
.add_items(self
._feed
, self
._items
)
474 class DeleteItemsAction
:
475 def __init__(self
, feed
, items
):
480 db
.delete_items(self
._feed
, self
._items
)
482 class ImageUpdateAction
:
483 def __init__(self
, url
, image
):
488 db
.update_image(self
._url
, self
._image
)
490 class ImageCountChangedAction
:
491 def __init__(self
, url
, count
):
496 db
.update_image_count(self
._url
, self
._count
)
499 def __init__(self
, dbhome
):
500 self
._db
= MyDB(DATABASE_FILE_NAME
, dbhome
, create
= 1)
502 self
._action
_queue
= []
503 self
.connect_signals()
504 c
= feeds
.category_list
.all_category
505 c
.connect('feed-added', self
._feed
_created
_cb
)
506 c
.connect('feed-removed', self
._feed
_deleted
_cb
)
508 def _feed_created_cb(self
, category
, feed
, *args
):
509 self
._connect
_feed
_signals
(feed
)
511 def _feed_deleted_cb(self
, category
, feed
):
512 self
.items_deleted_cb(feed
, feed
.items
)
514 def connect_signals(self
):
515 flist
= feeds
.feedlist
.flatten_list()
517 self
._connect
_feed
_signals
(f
)
519 def _connect_feed_signals(self
, feed
):
520 feed
.connect('items-added', self
.items_added_cb
)
521 feed
.connect('items-changed', self
.items_changed_cb
)
522 feed
.connect('items-deleted', self
.items_deleted_cb
)
524 def items_deleted_cb(self
, feed
, items
):
525 self
._action
_queue
.append(DeleteItemsAction(feed
, items
))
527 def items_added_cb(self
, feed
, items
):
528 self
._action
_queue
.append(ItemsAddedAction(feed
, items
))
530 def items_changed_cb(self
, feed
, items
):
531 self
._action
_queue
.append(ModifyItemsAction(items
))
533 def image_updated(self
, cache
, url
, data
):
534 self
.update_image(url
, data
)
536 def update_image(self
, url
, data
):
537 self
._action
_queue
.append(
538 ImageUpdateAction(url
, data
))
540 def read_image(self
, url
):
541 return self
._db
.get_image_data(url
)
543 def read_feed_items(self
, feed
):
544 return self
._db
.get_feed_items(feed
)
546 def get_number_of_unread(self
, feed_id
, cutoff
):
547 return self
._db
.get_number_of_unread(feed_id
, cutoff
)
549 def get_image_counts(self
):
550 return self
._db
.get_image_counts()
552 def set_image_count(self
, image
, count
):
553 self
._action
_queue
.append(
554 ImageCountChangedAction(image
, count
))
557 mlmgr
= MainloopManager
.get_instance()
558 mlmgr
.set_repeating_timer(5000, self
._run
)
561 mlmgr
= MainloopManager
.get_instance()
562 mlmgr
.end_repeating_timer(self
._run
)
563 self
._db
.checkpoint()
568 self
._db
.checkpoint()
573 prevtime
= time
.time()
575 tmptime
= time
.time()
576 timer
+= tmptime
- prevtime
577 cptimer
+= tmptime
- prevtime
581 while len(self
._action
_queue
):
582 action
= self
._action
_queue
.pop(0)
585 action
.doit(self
._db
)
586 except IndexError, e
:
590 self
._db
.checkpoint()
593 itemstore_instance
= None
595 global itemstore_instance
596 if itemstore_instance
is None:
598 itemstore_instance
= ItemStore(Config
.straw_home())
599 return itemstore_instance
601 def stringify_item(item
):
605 'description': item
.description
,
607 'guidislink': item
.guidislink
,
608 'pub_date': item
.pub_date
,
609 'source': item
.source
,
610 'images': item
.image_keys(),
613 'fm_license': item
.fm_license
,
614 'fm_changes': item
.fm_changes
,
615 'creator': item
.creator
,
616 'contributors': item
.contributors
,
617 'license_urls': item
.license_urls
,
618 'publication_name': item
.publication_name
,
619 'publication_volume': item
.publication_volume
,
620 'publication_number': item
.publication_number
,
621 'publication_section': item
.publication_section
,
622 'publication_starting_page': item
.publication_starting_page
,
623 'sticky': item
._sticky
,
624 'enclosures': item
.enclosures
}
625 return pickle
.dumps(itemdict
)
627 def unstringify_item(itemstring
):
630 idict
= _unpickle(itemstring
)
634 item
= SummaryItem
.SummaryItem()
635 item
.title
= idict
['title']
636 item
.link
= idict
['link']
637 item
.description
= idict
['description']
638 item
.guid
= idict
['guid']
639 item
.pub_date
= idict
['pub_date']
640 item
.source
= idict
['source']
641 for i
in idict
['images']:
642 item
.restore_image(i
)
643 item
.seen
= idict
['seen']
644 item
.id = idict
['id']
645 item
.guidislink
= idict
.get('guidislink', True)
646 item
.fm_license
= idict
.get('fm_license', None)
647 item
.fm_changes
= idict
.get('fm_changes', None)
648 item
.creator
= idict
.get('creator', None)
649 item
.contributors
= idict
.get('contributors', None)
650 item
.license_urls
= idict
.get('license_urls', None)
651 item
._sticky
= idict
.get('sticky', 0)
652 item
.enclosures
= idict
.get('enclosures', None)
653 item
.publication_name
= idict
.get('publication_name', None)
654 item
.publication_volume
= idict
.get('publication_volume', None)
655 item
.publication_number
= idict
.get('publication_number', None)
656 item
.publication_section
= idict
.get('publication_section', None)
657 item
.publication_starting_page
= idict
.get('publication_starting_page', None)
660 def _unpickle(istring
):
663 itemdict
= pickle
.loads(istring
)
664 except ValueError, ve
:
665 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring
))
666 except Exception, ex
:
670 if __name__
== '__main__':
671 from pprint
import pprint
672 db
= MyDB("itemstore.db", "%s/.straw" % os
.getenv('HOME'), create
= 1)
673 db
._db
_print
(db
._data
_print
)