fix variable name
[mygpo.git] / mygpo / maintenance / merge.py
blobd149a9e1d13124537d9d914fab6374705966b1c0
1 from itertools import chain, imap as map
2 import logging
3 from functools import partial
5 import restkit
7 from mygpo.core.models import Podcast, Episode, PodcastGroup
8 from mygpo.users.models import PodcastUserState, EpisodeUserState
9 from mygpo import utils
10 from mygpo.decorators import repeat_on_conflict
11 from mygpo.db.couchdb.episode import episodes_for_podcast
12 from mygpo.db.couchdb.podcast_state import all_podcast_states
13 from mygpo.db.couchdb.episode_state import all_episode_states
14 from mygpo.db.couchdb.utils import multi_request_view
17 class IncorrectMergeException(Exception):
18 pass
21 def podcast_url_wrapper(r):
22 url = r['key']
23 doc = r['doc']
24 if doc['doc_type'] == 'Podcast':
25 obj = Podcast.wrap(doc)
26 elif doc['doc_type'] == 'PodcastGroup':
27 obj = PodcastGroup.wrap(doc)
29 return obj.get_podcast_by_url(url)
32 def merge_objects(podcasts=True, podcast_states=False, episodes=False,
33 episode_states=False, dry_run=False):
34 """
35 Merges objects (podcasts, episodes, states) based on different criteria
36 """
38 # The "smaller" podcast is merged into the "greater"
39 podcast_merge_order = lambda a, b: cmp(a.subscriber_count(), b.subscriber_count())
40 no_merge_order = lambda a, b: 0
42 merger = partial(merge_from_iterator, dry_run=dry_run,
43 progress_callback=utils.progress)
46 if podcasts:
48 print 'Merging Podcasts by URL'
49 podcasts, total = get_view_count_iter(Podcast,
50 'podcasts/by_url',
51 wrap = False,
52 include_docs=True)
53 podcasts = map(podcast_url_wrapper, podcasts)
54 merger(podcasts, similar_urls, podcast_merge_order, total,
55 merge_podcasts)
56 print
59 print 'Merging Podcasts by Old-Id'
60 podcasts, total = get_view_count_iter(Podcast,
61 'podcasts/by_oldid',
62 wrap = False,
63 include_docs=True)
64 podcasts = imap(podcast_oldid_wrapper, podcasts)
65 merger(podcasts, similar_oldid, podcast_merge_order, total,
66 merge_podcasts)
67 print
70 if podcast_states:
71 print 'Merging Duplicate Podcast States'
72 states, total = get_view_count_iter(PodcastUserState,
73 'podcast_states/by_user',
74 include_docs=True)
75 should_merge = lambda a, b: a == b
76 merger(states, should_merge, no_merge_order, total,
77 merge_podcast_states)
78 print
81 if episodes:
82 print 'Merging Episodes by URL'
83 episodes, total = get_view_count_iter(Episode,
84 'episodes/by_podcast_url',
85 include_docs=True)
86 should_merge = lambda a, b: a.podcast == b.podcast and \
87 similar_urls(a, b)
88 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
89 print
92 print 'Merging Episodes by Old-Id'
93 episodes, total = get_view_count_iter(Episode,
94 'episodes/by_oldid',
95 include_docs=True)
96 should_merge = lambda a, b: a.podcast == b.podcast and \
97 similar_oldid(a, b)
98 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
99 print
102 if episode_states:
103 print 'Merging Duplicate Episode States'
104 states, total = get_view_count_iter(EpisodeUserState,
105 'episode_states/by_user_episode',
106 include_docs=True)
107 should_merge = lambda a, b: (a.user, a.episode) == \
108 (b.user, b.episode)
109 merger(states, should_merge, no_merge_order, total,
110 merge_episode_states)
111 print
115 def get_view_count_iter(cls, view, *args, **kwargs):
116 iterator = multi_request_view(cls, view, *args, **kwargs)
117 total = cls.view(view, limit=0).total_rows
118 return iterator, total
121 def merge_from_iterator(obj_it, should_merge, cmp, total, merge_func,
122 dry_run=False, progress_callback=lambda *args, **kwargs: None):
124 Iterates over the objects in obj_it and calls should_merge for each pair of
125 objects. This implies that the objects returned by obj_it should be sorted
126 such that potential merge-candiates appear after each other.
128 If should_merge returns True, the pair of objects is going to be merged.
129 The smaller object (according to cmp) is merged into the larger one.
130 merge_func performs the actual merge. It is passed the two objects to be
131 merged (first the larger, then the smaller one).
134 obj_it = iter(obj_it)
136 try:
137 prev = obj_it.next()
138 except StopIteration:
139 return
141 for n, p in enumerate(obj_it):
142 if should_merge(p, prev):
143 items = sorted([p, prev], cmp=cmp)
144 logging.info('merging {old} into {new}'.
145 format(old=items[1], new=items[0]))
147 merge_func(*items, dry_run=dry_run)
149 prev = p
150 progress_callback(n, total)
154 class PodcastMerger(object):
155 """ Merges podcast2 into podcast
157 Also merges the related podcast states, and re-assignes podcast2's episodes
158 to podcast, but does neither merge their episodes nor their episode states
162 def __init__(self, podcasts, actions, groups, dry_run=False):
164 for n, podcast1 in enumerate(podcasts):
165 for m, podcast2 in enumerate(podcasts):
166 if podcast1 == podcast2 and n != m:
167 raise IncorrectMergeException("can't merge podcast into itself")
169 self.podcasts = podcasts
170 self.actions = actions
171 self.groups = groups
172 self.dry_run = dry_run
175 def merge(self):
176 podcast1 = self.podcasts.pop(0)
178 for podcast2 in self.podcasts:
179 self._merge_objs(podcast1=podcast1, podcast2=podcast2)
180 self.merge_states(podcast1, podcast2)
181 self.merge_episodes()
182 self.reassign_episodes(podcast1, podcast2)
183 self._delete(podcast2=podcast2)
185 self.actions['merge-podcast'] += 1
188 def merge_episodes(self):
189 for n, episodes in self.groups:
191 episode = episodes.pop(0)
193 for ep in episodes:
195 em = EpisodeMerger(episode, ep, self.actions)
196 em.merge()
199 @repeat_on_conflict(['podcast1', 'podcast2'])
200 def _merge_objs(self, podcast1, podcast2):
202 podcast1.merged_ids = set_filter(podcast1.get_id(),
203 podcast1.merged_ids, [podcast2.get_id()], podcast2.merged_ids)
205 podcast1.merged_slugs = set_filter(podcast1.slug,
206 podcast1.merged_slugs, [podcast2.slug], podcast2.merged_slugs)
208 podcast1.merged_oldids = set_filter(podcast1.oldid,
209 podcast1.merged_oldids, [podcast2.oldid],
210 podcast2.merged_oldids)
212 # the first URL in the list represents the podcast main URL
213 main_url = podcast1.url
214 podcast1.urls = set_filter(None, podcast1.urls, podcast2.urls)
215 # so we insert it as the first again
216 podcast1.urls.remove(main_url)
217 podcast1.urls.insert(0, main_url)
219 # we ignore related_podcasts because
220 # * the elements should be roughly the same
221 # * element order is important but could not preserved exactly
223 podcast1.content_types = set_filter(None, podcast1.content_types,
224 podcast2.content_types)
226 key = lambda x: x.timestamp
227 for a, b in utils.iterate_together(
228 [podcast1.subscribers, podcast2.subscribers], key):
230 if a is None or b is None:
231 continue
233 # avoid increasing subscriber_count when merging
234 # duplicate entries of a single podcast
235 if a.subscriber_count == b.subscriber_count:
236 continue
238 a.subscriber_count += b.subscriber_count
240 for src, tags in podcast2.tags.items():
241 podcast1.tags[src] = set_filter(None, podcast1.tags.get(src, []),
242 tags)
244 podcast1.save()
247 @repeat_on_conflict(['podcast2'])
248 def _delete(self, podcast2):
249 podcast2.delete()
252 @repeat_on_conflict(['s'])
253 def _save_state(self, s, podcast1):
254 s.podcast = podcast1.get_id()
255 s.save()
258 @repeat_on_conflict(['e'])
259 def _save_episode(self, e, podcast1):
260 e.podcast = podcast1.get_id()
261 e.save()
265 def reassign_episodes(self, podcast1, podcast2):
266 # re-assign episodes to new podcast
267 # if necessary, they will be merged later anyway
268 for e in episodes_for_podcast(podcast2):
269 self.actions['reassign-episode'] += 1
271 for s in all_episode_states(e):
272 self.actions['reassign-episode-state'] += 1
274 self._save_state(s=s, podcast1=podcast1)
276 self._save_episode(e=e, podcast1=podcast1)
279 def merge_states(self, podcast1, podcast2):
280 """Merges the Podcast states that are associated with the two Podcasts.
282 This should be done after two podcasts are merged
285 key = lambda x: x.user
286 states1 = sorted(all_podcast_states(podcast1), key=key)
287 states2 = sorted(all_podcast_states(podcast2), key=key)
289 for state, state2 in utils.iterate_together([states1, states2], key):
291 if state == state2:
292 continue
294 if state is None:
295 self.actions['move-podcast-state'] += 1
296 self._move_state(state2=state2, new_id=podcast1.get_id(),
297 new_url=podcast1.url)
299 elif state2 is None:
300 continue
302 else:
303 psm = PodcastStateMerger(state, state2, self.actions)
304 psm.merge()
307 @repeat_on_conflict(['state2'])
308 def _move_state(self, state2, new_id, new_url):
309 state2.ref_url = new_url
310 state2.podcast = new_id
311 state2.save()
313 @repeat_on_conflict(['state2'])
314 def _delete_state(state2):
315 state2.delete()
320 def similar_urls(a, b):
321 """ Two Podcasts/Episodes are merged, if they have the same URLs"""
322 return bool(utils.intersect(a.urls, b.urls))
329 class EpisodeMerger(object):
332 def __init__(self, episode1, episode2, actions, dry_run=False):
333 if episode1 == episode2:
334 raise IncorrectMergeException("can't merge episode into itself")
336 self.episode1 = episode1
337 self.episode2 = episode2
338 self.actions = actions
339 self.dry_run = dry_run
342 def merge(self):
343 self._merge_objs(episode1=self.episode1, episode2=self.episode2)
344 self.merge_states(self.episode1, self.episode2)
345 self._delete(e=self.episode2)
346 self.actions['merge-episode'] += 1
349 @repeat_on_conflict(['episode1'])
350 def _merge_objs(self, episode1, episode2):
352 episode1.urls = set_filter(None, episode1.urls, episode2.urls)
354 episode1.merged_ids = set_filter(episode1._id, episode1.merged_ids,
355 [episode2._id], episode2.merged_ids)
357 episode1.merged_slugs = set_filter(episode1.slug,
358 episode1.merged_slugs, [episode2.slug], episode2.merged_slugs)
360 episode1.save()
363 @repeat_on_conflict(['e'])
364 def _delete(self, e):
365 e.delete()
368 def merge_states(self, episode, episode2):
370 key = lambda x: x.user
371 states1 = sorted(all_episode_states(self.episode1), key=key)
372 states2 = sorted(all_episode_states(self.episode2), key=key)
374 for state, state2 in utils.iterate_together([states1, states2], key):
376 if state == state2:
377 continue
379 if state is None:
380 self.actions['move-episode-state'] += 1
381 self._move(state2=state2, podcast_id=self.episode1.podcast,
382 episode_id=self.episode1._id)
384 elif state2 is None:
385 continue
387 else:
388 esm = EpisodeStateMerger(state, state2, self.actions)
389 esm.merge()
392 @repeat_on_conflict(['state2'])
393 def _move(self, state2, podcast_id, episode_id):
394 state2.podcast = podcast_id
395 state2.episode = episode_id
396 state2.save()
403 class PodcastStateMerger(object):
404 """Merges the two given podcast states"""
406 def __init__(self, state, state2, actions, dry_run=False):
408 if state._id == state2._id:
409 raise IncorrectMergeException("can't merge podcast state into itself")
411 if state.user != state2.user:
412 raise IncorrectMergeException("states don't belong to the same user")
414 self.state = state
415 self.state2 = state2
416 self.actions = actions
417 self.dry_run = dry_run
420 def merge(self):
421 self._do_merge(state=self.state, state2=self.state2)
422 self._add_actions(state=self.state, actions=self.state2.actions)
423 self._delete(state2=self.state2)
424 self.actions['merged-podcast-state'] += 1
427 @repeat_on_conflict(['state'])
428 def _do_merge(self, state, state2):
430 # overwrite settings in state2 with state's settings
431 settings = state2.settings
432 settings.update(state.settings)
433 state.settings = settings
435 state.disabled_devices = set_filter(None, state.disabled_devices,
436 state2.disabled_devices)
438 state.merged_ids = set_filter(state1._id, state.merged_ids,
439 [state2._id], state2.merged_ids)
441 state.tags = set_filter(None, state.tags, state2.tags)
443 state.save()
446 @repeat_on_conflict(['state'])
447 def _add_actions(self, state, actions):
448 try:
449 state.add_actions(actions)
450 state.save()
451 except restkit.Unauthorized:
452 # the merge could result in an invalid list of
453 # subscribe/unsubscribe actions -- we ignore it and
454 # just use the actions from state
455 return
457 @repeat_on_conflict(['state2'])
458 def _delete(self, state2):
459 state2.delete()
465 class EpisodeStateMerger(object):
466 """ Merges state2 in state """
468 def __init__(self, state, state2, actions, dry_run=False):
470 if state._id == state2._id:
471 raise IncorrectMergeException("can't merge episode state into itself")
473 if state.user != state2.user:
474 raise IncorrectMergeException("states don't belong to the same user")
476 self.state = state
477 self.state2 = state2
478 self.actions = actions
479 self.dry_run = dry_run
482 def merge(self):
483 self._merge_obj(state=self.state, state2=self.state2)
484 self._do_delete(state2=self.state2)
485 self.actions['merge-episode-state'] += 1
488 @repeat_on_conflict(['state'])
489 def _merge_obj(self, state, state2):
490 state.add_actions(state2.actions)
492 # overwrite settings in state2 with state's settings
493 settings = state2.settings
494 settings.update(state.settings)
495 state.settings = settings
497 merged_ids = set(state.merged_ids + [state2._id] + state2.merged_ids)
498 state.merged_ids = filter(None, merged_ids)
500 state.chapters = list(set(state.chapters + state2.chapters))
502 state.save()
504 @repeat_on_conflict(['state2'])
505 def _do_delete(self, state2):
506 state2.delete()
509 def set_filter(orig, *args):
510 """ chain args, and remove falsy values and orig """
511 s = set(chain.from_iterable(args))
512 s = s - set([orig])
513 s = filter(None, s)
514 return s