3 Data store abstraction module.
5 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 import cPickle
as pickle
28 from bsddb
.db
import *
31 from bsddb3
.db
import *
32 import bsddb3
as bsddb
38 from MainloopManager
import MainloopManager
40 DATABASE_FILE_NAME
= "itemstore.db"
42 class ConvertException(Exception):
43 def __init__(self
, version1
, version2
, reason
):
44 self
.version1
= version1
45 self
.version2
= version2
51 def __init__(self
, filename
, dbhome
, create
=0, truncate
=0, mode
=0600,
52 recover
=0, dbflags
=0):
55 recoverenv
= DB_CREATE | DB_RECOVER
56 # DB_INIT_TXN automatically enables logging
57 flagsforenv
= DB_INIT_TXN | DB_INIT_MPOOL | DB_INIT_LOCK | DB_PRIVATE
60 self
._env
.set_data_dir(dbhome
)
61 self
._env
.set_lk_detect(DB_LOCK_DEFAULT
) # enable auto deadlock avoidance
62 self
._env
.set_lg_max(2**20)
63 self
._env
.set_lk_max_locks (10000)
64 self
._env
.set_lk_max_objects (10000)
67 self
._env
.open(dbhome
, recoverenv | flagsforenv
, mode
)
68 except bsddb
._db
.DBRunRecoveryError
, err
:
69 self
._env
.remove(dbhome
)
72 sys
.exit("Recovery Error: See README for details on how to recover data. ")
79 flags |
= DB_AUTO_COMMIT
84 self
._db
= DB(self
._env
)
85 self
._db
.open(filename
, DB_BTREE
, flags
, mode
)
86 except bsddb
._db
.DBNoSuchFileError
:
88 self
._db
= DB(self
._env
)
89 self
._db
.open(filename
, DB_BTREE
, flags | DB_CREATE
, mode
)
90 self
.set_db_version(self
.CURRENT_VERSION
)
94 self
.convert_old_versions()
97 filename
= tempfile
.mktemp(prefix
="straw-")
98 fh
= open(filename
, "w")
99 traceback
.print_exc(None, fh
)
100 raise ConvertException(self
.get_db_version(),
101 MyDB
.CURRENT_VERSION
, "%s" % filename
)
106 if self
._db
is not None:
109 if self
._env
is not None:
113 def checkpoint(self
):
114 # set flags to 0 or DB_FORCE, else will raise EINVAL (InvalidArgError)
116 self
._env
.txn_checkpoint(cpflags | DB_FORCE
)
117 deletees
= self
._env
.log_archive(DB_ARCH_ABS
)
121 def begin_transaction(self
):
122 return self
._env
.txn_begin()
124 def get_item_ids(self
, iid
, txn
):
125 key
= "fids:%d" % iid
126 dids
= self
._db
.get(key
, txn
=txn
)
129 ids
= pickle
.loads(dids
)
132 def save_feed_item_ids(self
, feed
, ids
, txn
=None):
133 rowid
= "fids:%d" % feed
.id
136 txn
= self
.begin_transaction()
140 self
._db
.delete(rowid
, txn
=txn
)
141 except DBNotFoundError
:
143 self
._db
.put(rowid
, pickle
.dumps(ids
), txn
=txn
)
144 except Exception, ex
:
152 def get_item(self
, feed_id
, item_id
, txn
=None):
153 item
= self
._db
.get("%d:%d" % (feed_id
, item_id
), txn
=txn
)
154 return unstringify_item(item
)
156 def add_items(self
, feed
, items
):
157 txn
= self
.begin_transaction()
159 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
161 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id), stringify_item(item
), txn
=txn
)
162 # TODO: it might be a good idea to check here that we don't add
163 # duplicate items. It doesn't happen normally, but there can be
164 # bugs that trigger that. Throwing an exception would be the
165 # the right thing: it wouldn't hide the breakage.
166 feed_item_ids
.append(item
.id)
167 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
168 except Exception, ex
:
174 def delete_items(self
, feed
, items
):
175 """ Deletes a list of items.
177 Useful for cutting old items based on number of items stored.
179 txn
= self
.begin_transaction()
181 feed_item_ids
= self
.get_item_ids(feed
.id, txn
=txn
)
182 # because of bugs, we sometime get here duplicate ids. instead of dying,
183 # warn the user but continue
187 if item
.id in item_ids
:
188 log("WARNING: skipping duplicate ids in delete items request %s and %s" % (item
.title
, item
.id))
189 # filter out any duplicates
190 feed_item_ids
= filter(lambda x
: x
!= item
.id, feed_item_ids
)
192 item_ids
.append(item
.id)
193 #log("deleting item %d:%d" % (feed.id, item.id))
194 if item
.id in feed_item_ids
:
195 feed_item_ids
.remove(item
.id)
196 self
._db
.delete("%d:%d" % (feed
.id, item
.id), txn
=txn
)
197 self
.save_feed_item_ids(feed
, feed_item_ids
, txn
)
198 except Exception, ex
:
200 log_exc("error while deleting items")
204 def modify_items(self
, items
):
205 txn
= self
.begin_transaction()
208 self
._db
.put("%d:%d" % (item
.feed
.id, item
.id),
209 stringify_item(item
), txn
=txn
)
210 except Exception, ex
:
216 def get_feed_items(self
, feed
):
217 txn
= self
.begin_transaction()
220 ids
= self
.get_item_ids(feed
.id, txn
=txn
)
222 item
= self
.get_item(feed
.id, id, txn
=txn
)
225 except Exception, ex
:
232 def get_number_of_unread(self
, fid
, cutoff
):
233 # Used by config conversion
234 # NOTE: this is the number of unread items in 'number of items stored'
235 # preference. Since straw stores the most recent items down the list,
236 # we only count the unread items from the most recent N items,
238 txn
= self
.begin_transaction()
241 ids
= self
.get_item_ids(fid
, txn
=txn
)
242 for id in ids
[len(ids
)-cutoff
:]:
243 item
= self
.get_item(fid
, id, txn
=txn
)
244 if item
is not None and item
.seen
== 0:
247 except Exception, ex
:
254 def get_image_urls(self
, txn
=None):
255 dkeys
= self
._db
.get("images", txn
=txn
)
257 if dkeys
is not None:
258 keys
= pickle
.loads(dkeys
)
261 def save_image_urls(self
, urls
, txn
=None):
262 self
._db
.put("images", pickle
.dumps(urls
), txn
=txn
)
264 def get_image_counts(self
, txn
=None):
265 images
= self
.get_image_urls(txn
)
268 key
= ("imagecount:" + image
).encode('utf-8')
269 value
= self
._db
.get(str(key
))
271 counts
.append((image
, int(value
)))
273 log("exception for ", key
, ", type of value ", value
, ": ", type(value
))
276 def update_image_count(self
, url
, count
):
277 #logparam(locals(), "url", "count")
278 key
= ("imagecount:" + url
).encode('utf-8')
279 txn
= self
.begin_transaction()
282 self
._db
.delete(key
, txn
=txn
)
284 self
._db
.put(key
, str(count
), txn
=txn
)
291 def update_image(self
, url
, image
):
292 key
= "image:%s" % str(url
)
293 txn
= self
.begin_transaction()
295 image_urls
= self
.get_image_urls(txn
)
296 if image
is not None:
297 self
._db
.put(key
.encode('utf-8'), image
, txn
=txn
)
298 if url
not in image_urls
:
299 image_urls
.append(url
)
300 self
.save_image_urls(image_urls
, txn
)
302 if url
in image_urls
:
304 self
._db
.delete(key
, txn
=txn
)
305 except DBNotFoundError
:
306 log("Key not found", key
)
307 image_urls
.remove(url
)
308 self
.save_image_urls(image_urls
, txn
=txn
)
315 def get_image_data(self
, url
, txn
=None):
317 "image:%s" % url
.encode('utf-8'), default
= None, txn
=txn
)
319 def _image_print(self
, key
, data
):
320 if key
[:6] == "image:":
323 def _data_print(self
, key
, data
):
324 data
= pickle
.loads(data
)
327 def _db_print(self
, helper
):
328 """Print the database to stdout for debugging"""
329 print "******** Printing raw database for debugging ********"
330 print "database version: %s" % self
.get_db_version()
331 cur
= self
._db
.cursor()
333 key
, data
= cur
.first()
342 def get_db_version(self
, txn
=None):
343 version
= self
._db
.get("straw_db_version", default
= "1", txn
=txn
)
346 def set_db_version(self
, version
, txn
=None):
349 txn
= self
.begin_transaction()
350 self
._db
.put("straw_db_version", str(version
), txn
=txn
)
357 def convert_old_versions(self
):
358 version
= self
.get_db_version()
359 while version
< self
.CURRENT_VERSION
:
361 mname
= "convert_%d_%d" % (version
, next
)
363 method
= getattr(self
, mname
)
364 except AttributeError:
365 raise ConvertException(version
, next
, "No conversion function specified")
367 self
.set_db_version(next
)
370 def convert_1_2(self
):
372 parts
= key
.split(':')
375 return parts
[0].isdigit() and parts
[1].isdigit()
377 def round_second(ttuple
):
379 l
[5] = int(round(l
[5]))
383 import mx
.DateTime
as mxd
385 raise ConvertException(1, 2, _("Couldn't import mx.DateTime"))
386 txn
= self
.begin_transaction()
388 cur
= self
._db
.cursor(txn
=txn
)
393 key
, data
= cur
.first()
394 while key
is not None:
396 dict = pickle
.loads(data
)
397 if isinstance(dict['pub_date'], mxd
.DateTimeType
):
399 t
= time
.gmtime(time
.mktime(round_second(p
.tuple())))
401 data
= pickle
.dumps(dict)
402 cur
.put(key
, data
, DB_CURRENT
)
410 except Exception, ex
:
416 def convert_2_3(self
):
418 parts
= key
.split(':')
421 return parts
[0].isdigit() and parts
[1].isdigit()
423 imagelistcursor
= None
425 txn
= self
.begin_transaction()
427 cur
= self
._db
.cursor(txn
=txn
)
432 key
, data
= cur
.first()
433 while key
is not None:
435 dic
= pickle
.loads(data
)
436 for image
in dic
['images']:
437 images
[image
] = images
.get(image
, 0) + 1
438 elif key
== "images":
439 imagelistcursor
= cur
.dup(DB_POSITION
)
445 for image
, count
in images
.items():
446 key
= ("imagecount:" + image
).encode('utf-8')
447 cur
.put(key
, str(count
), DB_KEYFIRST
)
448 imagelistcursor
.put("images", pickle
.dumps(images
.keys()), DB_CURRENT
)
451 if imagelistcursor
!= None:
452 imagelistcursor
.close()
453 except Exception, ex
:
459 class ModifyItemAction
:
460 def __init__(self
, item
):
464 db
.modify_items([self
._item
])
466 class ModifyItemsAction
:
467 def __init__(self
, items
):
471 db
.modify_items(self
._items
)
473 class ItemsAddedAction
:
474 def __init__(self
, feed
, items
):
479 db
.add_items(self
._feed
, self
._items
)
481 class DeleteItemsAction
:
482 def __init__(self
, feed
, items
):
487 db
.delete_items(self
._feed
, self
._items
)
489 class ImageUpdateAction
:
490 def __init__(self
, url
, image
):
495 db
.update_image(self
._url
, self
._image
)
497 class ImageCountChangedAction
:
498 def __init__(self
, url
, count
):
503 db
.update_image_count(self
._url
, self
._count
)
506 def __init__(self
, dbhome
):
507 feedlist
= feeds
.get_instance()
508 self
._db
= MyDB(DATABASE_FILE_NAME
, dbhome
, create
= 1)
509 self
.connect_signals()
510 feedlist
.connect('updated', self
._feed
_created
_cb
)
511 feedlist
.connect('deleted', self
._feed
_deleted
_cb
)
512 ImageCache
.cache
.signal_connect(Event
.ImageUpdatedSignal
,
515 self
._action
_queue
= []
517 def _feed_created_cb(self
, flist
, feed
, *args
):
518 self
._connect
_feed
_signals
(feed
)
520 def _feed_deleted_cb(self
, flist
, feed
):
521 self
._disconnect
_feed
_signals
(feed
)
523 def connect_signals(self
):
524 flist
= feeds
.get_instance().flatten_list()
526 self
._connect
_feed
_signals
(f
)
528 def _connect_feed_signals(self
, feed
):
529 feed
.connect('items-updated', self
.items_added
)
530 feed
.connect('items-read', self
.all_items_read
)
531 feed
.connect('items-deleted', self
.items_deleted
)
533 #feed.signal_connect(Event.ItemReadSignal, self.item_modified)
534 #feed.signal_connect(Event.ItemStickySignal, self.item_modified)
536 def _disconnect_feed_signals(self
, feed
):
537 #feed.signal_disconnect(Event.NewItemsSignal, self.items_added)
538 #feed.signal_disconnect(Event.ItemReadSignal, self.item_modified)
539 #feed.signal_disconnect(Event.ItemStickySignal, self.item_modified)
540 #feed.signal_disconnect(Event.AllItemsReadSignal, self.all_items_read)
541 #feed.signal_disconnect(Event.ItemDeletedSignal, self.item_deleted)
544 def modify_item(self
, item
):
545 self
._action
_queue
.append(ModifyItemAction(item
))
548 def image_updated(self
, signal
):
549 self
._action
_queue
.append(
550 ImageUpdateAction(signal
.url
, signal
.data
))
552 def read_image(self
, url
):
553 return self
._db
.get_image_data(url
)
555 def items_deleted(self
, feed
, items
):
556 self
._action
_queue
.append(DeleteItemsAction(feed
, items
))
558 def item_modified(self
, signal
):
559 self
.modify_item(signal
.item
)
561 def all_items_read(self
, feed
, items
):
562 self
._action
_queue
.append(ModifyItemsAction(items
))
564 def items_added(self
, feed
, items
):
565 self
._action
_queue
.append(ItemsAddedAction(feed
, items
))
567 def read_feed_items(self
, feed
):
568 return self
._db
.get_feed_items(feed
)
570 def get_number_of_unread(self
, feed_id
, cutoff
):
571 return self
._db
.get_number_of_unread(feed_id
, cutoff
)
573 def get_image_counts(self
):
574 return self
._db
.get_image_counts()
576 def set_image_count(self
, image
, count
):
577 self
._action
_queue
.append(
578 ImageCountChangedAction(image
, count
))
581 mlmgr
= MainloopManager
.get_instance()
582 mlmgr
.set_repeating_timer(5000, self
._run
)
585 mlmgr
= MainloopManager
.get_instance()
586 mlmgr
.end_repeating_timer(self
._run
)
587 self
._db
.checkpoint()
592 self
._db
.checkpoint()
597 prevtime
= time
.time()
599 tmptime
= time
.time()
600 timer
+= tmptime
- prevtime
601 cptimer
+= tmptime
- prevtime
605 while len(self
._action
_queue
):
606 action
= self
._action
_queue
.pop(0)
609 action
.doit(self
._db
)
610 except IndexError, e
:
614 self
._db
.checkpoint()
617 itemstore_instance
= None
619 global itemstore_instance
620 if itemstore_instance
is None:
622 itemstore_instance
= ItemStore(Config
.straw_home())
623 return itemstore_instance
625 def stringify_item(item
):
629 'description': item
.description
,
631 'guidislink': item
.guidislink
,
632 'pub_date': item
.pub_date
,
633 'source': item
.source
,
634 'images': item
.image_keys(),
637 'fm_license': item
.fm_license
,
638 'fm_changes': item
.fm_changes
,
639 'creator': item
.creator
,
640 'contributors': item
.contributors
,
641 'license_urls': item
.license_urls
,
642 'publication_name': item
.publication_name
,
643 'publication_volume': item
.publication_volume
,
644 'publication_number': item
.publication_number
,
645 'publication_section': item
.publication_section
,
646 'publication_starting_page': item
.publication_starting_page
,
647 'sticky': item
._sticky
,
648 'enclosures': item
.enclosures
}
649 return pickle
.dumps(itemdict
)
651 def unstringify_item(itemstring
):
655 idict
= _unpickle(itemstring
)
659 item
= SummaryItem
.SummaryItem()
660 item
.title
= idict
['title']
661 item
.link
= idict
['link']
662 item
.description
= idict
['description']
663 item
.guid
= idict
['guid']
664 item
.pub_date
= idict
['pub_date']
665 item
.source
= idict
['source']
666 for i
in idict
['images']:
667 item
.restore_image(i
)
668 item
.seen
= idict
['seen']
669 item
.id = idict
['id']
670 item
.guidislink
= idict
.get('guidislink', True)
671 item
.fm_license
= idict
.get('fm_license', None)
672 item
.fm_changes
= idict
.get('fm_changes', None)
673 item
.creator
= idict
.get('creator', None)
674 item
.contributors
= idict
.get('contributors', None)
675 item
.license_urls
= idict
.get('license_urls', None)
676 item
._sticky
= idict
.get('sticky', 0)
677 item
.enclosures
= idict
.get('enclosures', None)
678 item
.publication_name
= idict
.get('publication_name', None)
679 item
.publication_volume
= idict
.get('publication_volume', None)
680 item
.publication_number
= idict
.get('publication_number', None)
681 item
.publication_section
= idict
.get('publication_section', None)
682 item
.publication_starting_page
= idict
.get('publication_starting_page', None)
685 def _unpickle(istring
):
688 itemdict
= pickle
.loads(istring
)
689 except ValueError, ve
:
690 log("ItemStore.unstringify_item: pickle.loads raised ValueError, argument was %s" % repr(itemstring
))
691 except Exception, ex
:
695 if __name__
== '__main__':
696 from pprint
import pprint
697 db
= MyDB("itemstore.db", "%s/.straw" % os
.getenv('HOME'), create
= 1)
698 db
._db
_print
(db
._data
_print
)