improve / refactor feed-downloader
[mygpo.git] / mygpo / db / couchdb / __init__.py
blobb991cd1a2b0e8f256f003ed1b5fd6d4c9be34ee6
1 from operator import itemgetter
2 from collections import namedtuple
4 from couchdbkit.ext.django import loading
6 from couchdbkit import *
8 import logging
9 logger = logging.getLogger(__name__)
12 def get_main_database():
13 return loading.get_db('core')
16 def get_categories_database():
17 """ returns the database that contains Category documents """
18 return loading.get_db('categories')
21 def get_pubsub_database():
22 return loading.get_db('pubsub')
25 def get_userdata_database():
26 return loading.get_db('userdata')
28 def get_database(user=None):
29 return get_main_database()
32 class BulkException(Exception):
34 def __init__(self, errors):
35 self.errors = errors
38 BulkError = namedtuple('BulkError', 'doc error reason')
41 def __default_reload(db, obj):
42 doc = db[obj._id]
43 return obj.__class__.wrap(doc)
46 __get_obj = itemgetter(0)
48 def bulk_save_retry(obj_funs, db, reload_f=__default_reload):
49 """ Saves multiple documents and retries failed ones
51 Objects to be saved are passed as (obj, mod_f), where obj is the CouchDB
52 and mod_f is the modification function that should be applied to it.
54 If saving a document fails, it is again fetched from the database, the
55 modification function is applied again and saving is retried. """
57 errors = []
59 while True:
61 # apply modification function (and keep funs)
62 obj_funs = [(f(o), f) for (o, f) in obj_funs]
64 # filter those with obj None
65 obj_funs = filter(lambda of: __get_obj(of) is not None, obj_funs)
67 # extract objects
68 objs = map(__get_obj, obj_funs)
70 if not objs:
71 return
73 try:
74 db.save_docs(objs)
75 return
77 except BulkSaveError as ex:
79 new_obj_funs = []
80 for res, (obj, f) in zip(ex.results, obj_funs):
81 if res.get('error', False) == 'conflict':
83 # reload conflicted object
84 obj = reload_f(db, obj)
85 new_obj_funs.append( (obj, f) )
87 elif res.get('error', False):
88 # don't retry other errors
89 err = BulkError(obj, res['error'], res.get('reason', None))
90 errors.append(err)
92 obj_funs = new_obj_funs
94 if errors:
95 logger.warn('Errors at bulk-save: %s', errors)
96 raise BulkException(errors)