move all User and EpisodetUserState db queries into separate module
[mygpo.git] / mygpo / maintenance / merge.py
blob2ad0bb8fea9f4623ab6135c0aa9aefa8bc8150be
1 from itertools import chain, imap as map
2 import logging
3 from functools import partial
5 import restkit
7 from mygpo.core.models import Podcast, Episode, PodcastGroup
8 from mygpo.users.models import PodcastUserState, EpisodeUserState
9 from mygpo import utils
10 from mygpo.decorators import repeat_on_conflict
11 from mygpo.db.couchdb.episode import episodes_for_podcast
12 from mygpo.db.couchdb.podcast_state import all_podcast_states
13 from mygpo.db.couchdb.episode_state import all_episode_states
16 class IncorrectMergeException(Exception):
17 pass
20 def podcast_url_wrapper(r):
21 url = r['key']
22 doc = r['doc']
23 if doc['doc_type'] == 'Podcast':
24 obj = Podcast.wrap(doc)
25 elif doc['doc_type'] == 'PodcastGroup':
26 obj = PodcastGroup.wrap(doc)
28 return obj.get_podcast_by_url(url)
31 def merge_objects(podcasts=True, podcast_states=False, episodes=False,
32 episode_states=False, dry_run=False):
33 """
34 Merges objects (podcasts, episodes, states) based on different criteria
35 """
37 # The "smaller" podcast is merged into the "greater"
38 podcast_merge_order = lambda a, b: cmp(a.subscriber_count(), b.subscriber_count())
39 no_merge_order = lambda a, b: 0
41 merger = partial(merge_from_iterator, dry_run=dry_run,
42 progress_callback=utils.progress)
45 if podcasts:
47 print 'Merging Podcasts by URL'
48 podcasts, total = get_view_count_iter(Podcast,
49 'podcasts/by_url',
50 wrap = False,
51 include_docs=True)
52 podcasts = map(podcast_url_wrapper, podcasts)
53 merger(podcasts, similar_urls, podcast_merge_order, total,
54 merge_podcasts)
55 print
58 print 'Merging Podcasts by Old-Id'
59 podcasts, total = get_view_count_iter(Podcast,
60 'podcasts/by_oldid',
61 wrap = False,
62 include_docs=True)
63 podcasts = imap(podcast_oldid_wrapper, podcasts)
64 merger(podcasts, similar_oldid, podcast_merge_order, total,
65 merge_podcasts)
66 print
69 if podcast_states:
70 print 'Merging Duplicate Podcast States'
71 states, total = get_view_count_iter(PodcastUserState,
72 'podcast_states/by_user',
73 include_docs=True)
74 should_merge = lambda a, b: a == b
75 merger(states, should_merge, no_merge_order, total,
76 merge_podcast_states)
77 print
80 if episodes:
81 print 'Merging Episodes by URL'
82 episodes, total = get_view_count_iter(Episode,
83 'episodes/by_podcast_url',
84 include_docs=True)
85 should_merge = lambda a, b: a.podcast == b.podcast and \
86 similar_urls(a, b)
87 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
88 print
91 print 'Merging Episodes by Old-Id'
92 episodes, total = get_view_count_iter(Episode,
93 'episodes/by_oldid',
94 include_docs=True)
95 should_merge = lambda a, b: a.podcast == b.podcast and \
96 similar_oldid(a, b)
97 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
98 print
101 if episode_states:
102 print 'Merging Duplicate Episode States'
103 states, total = get_view_count_iter(EpisodeUserState,
104 'episode_states/by_user_episode',
105 include_docs=True)
106 should_merge = lambda a, b: (a.user, a.episode) == \
107 (b.user, b.episode)
108 merger(states, should_merge, no_merge_order, total,
109 merge_episode_states)
110 print
114 def get_view_count_iter(cls, view, *args, **kwargs):
115 iterator = utils.multi_request_view(cls, view, *args, **kwargs)
116 total = cls.view(view, limit=0).total_rows
117 return iterator, total
120 def merge_from_iterator(obj_it, should_merge, cmp, total, merge_func,
121 dry_run=False, progress_callback=lambda *args, **kwargs: None):
123 Iterates over the objects in obj_it and calls should_merge for each pair of
124 objects. This implies that the objects returned by obj_it should be sorted
125 such that potential merge-candiates appear after each other.
127 If should_merge returns True, the pair of objects is going to be merged.
128 The smaller object (according to cmp) is merged into the larger one.
129 merge_func performs the actual merge. It is passed the two objects to be
130 merged (first the larger, then the smaller one).
133 obj_it = iter(obj_it)
135 try:
136 prev = obj_it.next()
137 except StopIteration:
138 return
140 for n, p in enumerate(obj_it):
141 if should_merge(p, prev):
142 items = sorted([p, prev], cmp=cmp)
143 logging.info('merging {old} into {new}'.
144 format(old=items[1], new=items[0]))
146 merge_func(*items, dry_run=dry_run)
148 prev = p
149 progress_callback(n, total)
153 class PodcastMerger(object):
154 """ Merges podcast2 into podcast
156 Also merges the related podcast states, and re-assignes podcast2's episodes
157 to podcast, but does neither merge their episodes nor their episode states
161 def __init__(self, podcasts, actions, groups, dry_run=False):
163 for n, podcast1 in enumerate(podcasts):
164 for m, podcast2 in enumerate(podcasts):
165 if podcast1 == podcast2 and n != m:
166 raise IncorrectMergeException("can't merge podcast into itself")
168 self.podcasts = podcasts
169 self.actions = actions
170 self.groups = groups
171 self.dry_run = dry_run
174 def merge(self):
175 podcast1 = self.podcasts.pop(0)
177 for podcast2 in self.podcasts:
178 self._merge_objs(podcast1=podcast1, podcast2=podcast2)
179 self.merge_states(podcast1, podcast2)
180 self.merge_episodes()
181 self.reassign_episodes(podcast1, podcast2)
182 self._delete(podcast2=podcast2)
184 self.actions['merge-podcast'] += 1
187 def merge_episodes(self):
188 for n, episodes in self.groups:
190 episode = episodes.pop(0)
192 for ep in episodes:
194 em = EpisodeMerger(episode, ep, self.actions)
195 em.merge()
198 @repeat_on_conflict(['podcast1', 'podcast2'])
199 def _merge_objs(self, podcast1, podcast2):
201 podcast1.merged_ids = set_filter(podcast1.merged_ids,
202 [podcast2.get_id()], podcast2.merged_ids)
204 podcast1.merged_slugs = set_filter(podcast1.merged_slugs,
205 [podcast2.slug], podcast2.merged_slugs)
207 podcast1.merged_oldids = set_filter(podcast1.merged_oldids,
208 [podcast2.oldid], podcast2.merged_oldids)
210 # the first URL in the list represents the podcast main URL
211 main_url = podcast1.url
212 podcast1.urls = set_filter(podcast1.urls, podcast2.urls)
213 # so we insert it as the first again
214 podcast1.urls.remove(main_url)
215 podcast1.urls.insert(0, main_url)
217 # we ignore related_podcasts because
218 # * the elements should be roughly the same
219 # * element order is important but could not preserved exactly
221 podcast1.content_types = set_filter(podcast1.content_types,
222 podcast2.content_types)
224 key = lambda x: x.timestamp
225 for a, b in utils.iterate_together(
226 [podcast1.subscribers, podcast2.subscribers], key):
228 if a is None or b is None: continue
230 # avoid increasing subscriber_count when merging
231 # duplicate entries of a single podcast
232 if a.subscriber_count == b.subscriber_count:
233 continue
235 a.subscriber_count += b.subscriber_count
237 for src, tags in podcast2.tags.items():
238 podcast1.tags[src] = set_filter(podcast1.tags.get(src, []), tags)
240 podcast1.save()
243 @repeat_on_conflict(['podcast2'])
244 def _delete(self, podcast2):
245 podcast2.delete()
248 @repeat_on_conflict(['s'])
249 def _save_state(self, s, podcast1):
250 s.podcast = podcast1.get_id()
251 s.save()
254 @repeat_on_conflict(['e'])
255 def _save_episode(self, e, podcast1):
256 e.podcast = podcast1.get_id()
257 e.save()
261 def reassign_episodes(self, podcast1, podcast2):
262 # re-assign episodes to new podcast
263 # if necessary, they will be merged later anyway
264 for e in episodes_for_podcast(podcast2):
265 self.actions['reassign-episode'] += 1
267 for s in all_episode_states(e):
268 self.actions['reassign-episode-state'] += 1
270 self._save_state(s=s, podcast1=podcast1)
272 self._save_episode(e=e, podcast1=podcast1)
275 def merge_states(self, podcast1, podcast2):
276 """Merges the Podcast states that are associated with the two Podcasts.
278 This should be done after two podcasts are merged
281 key = lambda x: x.user
282 states1 = sorted(all_podcast_states(podcast1), key=key)
283 states2 = sorted(all_podcast_states(podcast2), key=key)
285 for state, state2 in utils.iterate_together([states1, states2], key):
287 if state == state2:
288 continue
290 if state == None:
291 self.actions['move-podcast-state'] += 1
292 self._move_state(state2=state2, new_id=podcast1.get_id(),
293 new_url=podcast1.url)
295 elif state2 == None:
296 continue
298 else:
299 psm = PodcastStateMerger(state, state2, self.actions)
300 psm.merge()
303 @repeat_on_conflict(['state2'])
304 def _move_state(self, state2, new_id, new_url):
305 state2.ref_url = new_url
306 state2.podcast = new_id
307 state2.save()
309 @repeat_on_conflict(['state2'])
310 def _delete_state(state2):
311 state2.delete()
316 def similar_urls(a, b):
317 """ Two Podcasts/Episodes are merged, if they have the same URLs"""
318 return bool(utils.intersect(a.urls, b.urls))
325 class EpisodeMerger(object):
328 def __init__(self, episode1, episode2, actions, dry_run=False):
329 if episode1 == episode2:
330 raise IncorrectMergeException("can't merge episode into itself")
332 self.episode1 = episode1
333 self.episode2 = episode2
334 self.actions = actions
335 self.dry_run = dry_run
338 def merge(self):
339 self._merge_objs(episode1=self.episode1, episode2=self.episode2)
340 self.merge_states(self.episode1, self.episode2)
341 self._delete(e=self.episode2)
342 self.actions['merge-episode'] += 1
345 @repeat_on_conflict(['episode1'])
346 def _merge_objs(self, episode1, episode2):
348 episode1.urls = set_filter(episode1.urls, episode2.urls)
350 episode1.merged_ids = set_filter(episode1.merged_ids, [episode2._id],
351 episode2.merged_ids)
353 episode1.merged_slugs = set_filter(episode1.merged_slugs, [episode2.slug],
354 episode2.merged_slugs)
356 episode1.save()
359 @repeat_on_conflict(['e'])
360 def _delete(self, e):
361 e.delete()
364 def merge_states(self, episode, episode2):
366 key = lambda x: x.user
367 states1 = sorted(all_episode_states(self.episode1), key=key)
368 states2 = sorted(all_episode_states(self.episode2), key=key)
370 for state, state2 in utils.iterate_together([states1, states2], key):
372 if state == state2:
373 continue
375 if state == None:
376 self.actions['move-episode-state'] += 1
377 self._move(state2=state2, podcast_id=self.episode1.podcast,
378 episode_id=self.episode1._id)
380 elif state2 == None:
381 continue
383 else:
384 esm = EpisodeStateMerger(state, state2, self.actions)
385 esm.merge()
388 @repeat_on_conflict(['state2'])
389 def _move(self, state2, podcast_id, episode_id):
390 state2.podcast = podcast_id
391 state2.episode = episode_id
392 state2.save()
399 class PodcastStateMerger(object):
400 """Merges the two given podcast states"""
402 def __init__(self, state, state2, actions, dry_run=False):
404 if state._id == state2._id:
405 raise IncorrectMergeException("can't merge podcast state into itself")
407 if state.user != state2.user:
408 raise IncorrectMergeException("states don't belong to the same user")
410 self.state = state
411 self.state2 = state2
412 self.actions = actions
413 self.dry_run = dry_run
416 def merge(self):
417 self._do_merge(state=self.state, state2=self.state2)
418 self._add_actions(state=self.state, actions=self.state2.actions)
419 self._delete(state2=self.state2)
420 self.actions['merged-podcast-state'] += 1
423 @repeat_on_conflict(['state'])
424 def _do_merge(self, state, state2):
426 # overwrite settings in state2 with state's settings
427 settings = state2.settings
428 settings.update(state.settings)
429 state.settings = settings
431 state.disabled_devices = set_filter(state.disabled_devices,
432 state2.disabled_devices)
434 state.merged_ids = set_filter(state.merged_ids, [state2._id],
435 state2.merged_ids)
437 state.tags = set_filter(state.tags, state2.tags)
439 state.save()
442 @repeat_on_conflict(['state'])
443 def _add_actions(self, state, actions):
444 try:
445 state.add_actions(actions)
446 state.save()
447 except restkit.Unauthorized:
448 # the merge could result in an invalid list of
449 # subscribe/unsubscribe actions -- we ignore it and
450 # just use the actions from state
451 return
453 @repeat_on_conflict(['state2'])
454 def _delete(self, state2):
455 state2.delete()
461 class EpisodeStateMerger(object):
462 """ Merges state2 in state """
464 def __init__(self, state, state2, actions, dry_run=False):
466 if state._id == state2._id:
467 raise IncorrectMergeException("can't merge episode state into itself")
469 if state.user != state2.user:
470 raise IncorrectMergeException("states don't belong to the same user")
472 self.state = state
473 self.state2 = state2
474 self.actions = actions
475 self.dry_run = dry_run
478 def merge(self):
479 self._merge_obj(state=self.state, state2=self.state2)
480 self._do_delete(state2=self.state2)
481 self.actions['merge-episode-state'] += 1
484 @repeat_on_conflict(['state'])
485 def _merge_obj(self, state, state2):
486 state.add_actions(state2.actions)
488 # overwrite settings in state2 with state's settings
489 settings = state2.settings
490 settings.update(state.settings)
491 state.settings = settings
493 merged_ids = set(state.merged_ids + [state2._id] + state2.merged_ids)
494 state.merged_ids = filter(None, merged_ids)
496 state.chapters = list(set(state.chapters + state2.chapters))
498 state.save()
500 @repeat_on_conflict(['state2'])
501 def _do_delete(self, state2):
502 state2.delete()
505 def set_filter(*args):
506 return filter(None, set(chain.from_iterable(args)))