move get_main_database() to mygpo.db.couchdb
[mygpo.git] / mygpo / couch.py
blob03f7df1d321c3544e593a3286d3552ed59dea38d
1 from operator import itemgetter
2 from collections import namedtuple
4 from couchdbkit import *
6 class BulkException(Exception):
8 def __init__(self, errors):
9 self.errors = errors
12 BulkError = namedtuple('BulkError', 'doc error reason')
15 def __default_reload(db, obj):
16 _id = obj._id
18 if isinstance(obj, Document):
19 return obj.__class__.get(_id)
20 else:
21 return db[_id]
24 __get_obj = itemgetter(0)
26 def bulk_save_retry(obj_funs, db=None, reload_f=__default_reload):
27 """ Saves multiple documents and retries failed ones
29 Objects to be saved are passed as (obj, mod_f), where obj is the CouchDB
30 and mod_f is the modification function that should be applied to it.
32 If saving a document fails, it is again fetched from the database, the
33 modification function is applied again and saving is retried. """
35 db = db or get_main_database()
36 errors = []
38 while True:
40 # apply modification function (and keep funs)
41 obj_funs = [(f(o), f) for (o, f) in obj_funs]
43 # filter those with obj None
44 obj_funs = filter(lambda of: __get_obj(of) is not None, obj_funs)
46 # extract objects
47 objs = map(__get_obj, obj_funs)
49 if not objs:
50 return
52 try:
53 db.save_docs(objs)
54 return
56 except BulkSaveError as ex:
58 new_obj_funs = []
59 for res, (obj, f) in zip(ex.results, obj_funs):
60 if res.get('error', False) == 'conflict':
62 # reload conflicted object
63 obj = reload_f(db, obj)
64 new_obj_funs.append( (obj, f) )
66 elif res.get('error', False):
67 # don't retry other errors
68 err = BulkError(obj, res['error'], res.get('reason', None))
69 errors.append(err)
71 obj_funs = new_obj_funs
73 if errors:
74 raise BulkException(errors)