create "touch-couchdb-views" mgmt cmd
[mygpo.git] / mygpo / maintenance / merge.py
blobe5600fd026f6cda5648d5308aa1c4e706172fc25
1 from itertools import chain, imap
2 import logging
3 from functools import partial
5 import restkit
7 from mygpo.core.models import Podcast, Episode, PodcastGroup
8 from mygpo.users.models import PodcastUserState, EpisodeUserState
9 from mygpo import utils
10 from mygpo.decorators import repeat_on_conflict
13 class IncorrectMergeException(Exception):
14 pass
17 def podcast_url_wrapper(r):
18 url = r['key']
19 doc = r['doc']
20 if doc['doc_type'] == 'Podcast':
21 obj = Podcast.wrap(doc)
22 elif doc['doc_type'] == 'PodcastGroup':
23 obj = PodcastGroup.wrap(doc)
25 return obj.get_podcast_by_url(url)
27 def podcast_oldid_wrapper(r):
28 oldid = r['key']
29 doc = r['doc']
30 if doc['doc_type'] == 'Podcast':
31 obj = Podcast.wrap(doc)
32 elif doc['doc_type'] == 'PodcastGroup':
33 obj = PodcastGroup.wrap(doc)
35 return obj.get_podcast_by_oldid(oldid)
38 def merge_objects(podcasts=True, podcast_states=False, episodes=False,
39 episode_states=False, dry_run=False):
40 """
41 Merges objects (podcasts, episodes, states) based on different criteria
42 """
44 # The "smaller" podcast is merged into the "greater"
45 podcast_merge_order = lambda a, b: cmp(a.subscriber_count(), b.subscriber_count())
46 no_merge_order = lambda a, b: 0
48 merger = partial(merge_from_iterator, dry_run=dry_run,
49 progress_callback=utils.progress)
52 if podcasts:
54 print 'Merging Podcasts by URL'
55 podcasts, total = get_view_count_iter(Podcast,
56 'podcasts/by_url',
57 wrap = False,
58 include_docs=True)
59 podcasts = imap(podcast_url_wrapper, podcasts)
60 merger(podcasts, similar_urls, podcast_merge_order, total,
61 merge_podcasts)
62 print
65 print 'Merging Podcasts by Old-Id'
66 podcasts, total = get_view_count_iter(Podcast,
67 'podcasts/by_oldid',
68 wrap = False,
69 include_docs=True)
70 podcasts = imap(podcast_oldid_wrapper, podcasts)
71 merger(podcasts, similar_oldid, podcast_merge_order, total,
72 merge_podcasts)
73 print
76 if podcast_states:
77 print 'Merging Duplicate Podcast States'
78 states, total = get_view_count_iter(PodcastUserState,
79 'podcast_states/by_user',
80 include_docs=True)
81 should_merge = lambda a, b: a == b
82 merger(states, should_merge, no_merge_order, total,
83 merge_podcast_states)
84 print
87 if episodes:
88 print 'Merging Episodes by URL'
89 episodes, total = get_view_count_iter(Episode,
90 'episodes/by_podcast_url',
91 include_docs=True)
92 should_merge = lambda a, b: a.podcast == b.podcast and \
93 similar_urls(a, b)
94 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
95 print
98 print 'Merging Episodes by Old-Id'
99 episodes, total = get_view_count_iter(Episode,
100 'episodes/by_oldid',
101 include_docs=True)
102 should_merge = lambda a, b: a.podcast == b.podcast and \
103 similar_oldid(a, b)
104 merger(episodes, should_merge, no_merge_order, total, merge_episodes)
105 print
108 if episode_states:
109 print 'Merging Duplicate Episode States'
110 states, total = get_view_count_iter(EpisodeUserState,
111 'episode_states/by_user_episode',
112 include_docs=True)
113 should_merge = lambda a, b: (a.user, a.episode) == \
114 (b.user, b.episode)
115 merger(states, should_merge, no_merge_order, total,
116 merge_episode_states)
117 print
121 def get_view_count_iter(cls, view, *args, **kwargs):
122 iterator = utils.multi_request_view(cls, view, *args, **kwargs)
123 total = cls.view(view, limit=0).total_rows
124 return iterator, total
127 def merge_from_iterator(obj_it, should_merge, cmp, total, merge_func,
128 dry_run=False, progress_callback=lambda *args, **kwargs: None):
130 Iterates over the objects in obj_it and calls should_merge for each pair of
131 objects. This implies that the objects returned by obj_it should be sorted
132 such that potential merge-candiates appear after each other.
134 If should_merge returns True, the pair of objects is going to be merged.
135 The smaller object (according to cmp) is merged into the larger one.
136 merge_func performs the actual merge. It is passed the two objects to be
137 merged (first the larger, then the smaller one).
140 obj_it = iter(obj_it)
142 try:
143 prev = obj_it.next()
144 except StopIteration:
145 return
147 for n, p in enumerate(obj_it):
148 if should_merge(p, prev):
149 items = sorted([p, prev], cmp=cmp)
150 logging.info('merging {old} into {new}'.
151 format(old=items[1], new=items[0]))
153 merge_func(*items, dry_run=dry_run)
155 prev = p
156 progress_callback(n, total)
161 # MERGING PODCASTS
165 def merge_podcasts(podcast, podcast2, dry_run=False):
167 Merges podcast2 into podcast
170 if podcast == podcast2:
171 raise IncorrectMergeException("can't merge podcast into itself")
173 @repeat_on_conflict(['podcast'], reload_f=lambda p:Podcast.get(p.get_id()))
174 def _do_merge(podcast, podcast2):
176 podcast.merged_ids = set_filter(podcast.merged_ids,
177 [podcast2.get_id()], podcast2.merged_ids)
179 podcast.merged_slugs = set_filter(podcast.merged_slugs,
180 [podcast2.slug], podcast2.merged_slugs)
182 podcast.merged_oldids = set_filter(podcast.merged_oldids,
183 [podcast2.oldid], podcast2.merged_oldids)
185 # the first URL in the list represents the podcast main URL
186 main_url = podcast.url
187 podcast.urls = set_filter(podcast.urls, podcast2.urls)
188 # so we insert it as the first again
189 podcast.urls.remove(main_url)
190 podcast.urls.insert(0, main_url)
192 # we ignore related_podcasts because
193 # * the elements should be roughly the same
194 # * element order is important but could not preserved exactly
196 podcast.content_types = set_filter(podcast.content_types,
197 podcast2.content_types)
199 key = lambda x: x.timestamp
200 for a, b in utils.iterate_together(
201 [podcast.subscribers, podcast2.subscribers], key):
203 if a is None or b is None: continue
205 # avoid increasing subscriber_count when merging
206 # duplicate entries of a single podcast
207 if a.subscriber_count == b.subscriber_count:
208 continue
210 a.subscriber_count += b.subscriber_count
212 for src, tags in podcast2.tags.items():
213 podcast.tags[src] = set_filter(podcast.tags.get(src, []), tags)
215 podcast.save()
218 @repeat_on_conflict(['podcast2'])
219 def _do_delete(podcast2):
220 podcast2.delete()
223 # re-assign episodes to new podcast
224 # if necessary, they will be merged later anyway
225 for e in podcast2.get_episodes():
227 @repeat_on_conflict(['s'])
228 def save_state(s):
229 s.podcast = podcast.get_id()
230 s.save()
233 @repeat_on_conflict(['e'])
234 def save_episode(e):
235 e.podcast = podcast.get_id()
236 e.save()
238 for s in e.get_all_states():
239 save_state(s=s)
241 save_episode(e=e)
244 _do_merge(podcast=podcast, podcast2=podcast2)
245 merge_podcast_states_for_podcasts(podcast, podcast2, dry_run=dry_run)
246 _do_delete(podcast2=podcast2)
248 # Merge Episode States
249 no_merge_order = lambda a, b: 0
250 episodes = sorted(podcast.get_episodes(), key=lambda e: e.url)
251 should_merge = lambda a, b: a.podcast == b.podcast and similar_urls(a, b)
253 merge_from_iterator(episodes, should_merge, no_merge_order, len(episodes),
254 merge_episodes)
257 def similar_urls(a, b):
258 """ Two Podcasts/Episodes are merged, if they have the same URLs"""
259 return bool(utils.intersect(a.urls, b.urls))
262 def similar_oldid(o1, o2):
263 """ Two Podcasts/Episodes are merged, if they have the same Old-IDs"""
264 return o1.oldid == o2.oldid and o1.oldid is not None
269 # MERGING EPISODES
274 def merge_episodes(episode, e, dry_run=False):
276 if episode == e:
277 raise IncorrectMergeException("can't merge episode into itself")
279 episode.urls = set_filter(episode.urls, e.urls)
281 episode.merged_ids = set_filter(episode.merged_ids, [e._id],
282 e.merged_ids)
284 episode.merged_slugs = set_filter(episode.merged_slugs, [e.slug],
285 e.merged_slugs)
287 @repeat_on_conflict(['e'])
288 def delete(e):
289 e.delete()
291 @repeat_on_conflict(['episode'])
292 def save(episode):
293 episode.save()
295 merge_episode_states_for_episodes(episode, e, dry_run)
297 save(episode=episode)
298 delete(e=e)
304 # MERGING PODCAST STATES
308 def merge_podcast_states_for_podcasts(podcast, podcast2, dry_run=False):
309 """Merges the Podcast states that are associated with the two Podcasts.
311 This should be done after two podcasts are merged
314 @repeat_on_conflict(['state2'])
315 def move(state2, new_id, new_url):
316 state2.ref_url = new_url
317 state2.podcast = new_id
318 state2.save()
320 @repeat_on_conflict(['state2'])
321 def _delete(state2):
322 state2.delete()
324 key = lambda x: x.user
325 states1 = podcast.get_all_states()
326 states2 = podcast2.get_all_states()
328 for state, state2 in utils.iterate_together([states1, states2], key):
330 if state == state2:
331 continue
333 if state == None:
334 _move(state2=state2, new_id=podcast.get_id(), new_url=podcast.url)
336 elif state2 == None:
337 continue
339 else:
340 merge_podcast_states(state, state2)
341 delete(state2=state2)
344 def merge_podcast_states(state, state2):
345 """Merges the two given podcast states"""
347 if state._id == state2._id:
348 raise IncorrectMergeException("can't merge podcast state into itself")
350 if state.user != state2.user:
351 raise IncorrectMergeException("states don't belong to the same user")
353 @repeat_on_conflict(['state'])
354 def _do_merge(state, state2):
356 # overwrite settings in state2 with state's settings
357 settings = state2.settings
358 settings.update(state.settings)
359 state.settings = settings
361 state.disabled_devices = set_filter(state.disabled_devices,
362 state2.disabled_devices)
364 state.merged_ids = set_filter(state.merged_ids, [state2._id],
365 state2.merged_ids)
367 state.tags = set_filter(state.tags, state2.tags)
369 state.save()
372 @repeat_on_conflict(['state'])
373 def _add_actions(state, actions):
374 try:
375 state.add_actions(actions)
376 state.save()
377 except restkit.Unauthorized:
378 # the merge could result in an invalid list of
379 # subscribe/unsubscribe actions -- we ignore it and
380 # just use the actions from state
381 return
383 @repeat_on_conflict(['state2'])
384 def _do_delete(state2):
385 state2.delete()
387 _do_merge(state=state, state2=state2)
389 _add_actions(state=state, actions=state2.actions)
391 _do_delete(state2=state2)
396 # MERGING EPISODE STATES
401 def merge_episode_states_for_episodes(episode, episode2, dry_run=False):
403 @repeat_on_conflict(['state2'])
404 def move(state2, podcast_id, episode_id):
405 state2.podcast = podcast_id
406 state2.episode = episode_id
407 state2.save()
409 key = lambda x: x.user
410 states1 = episode.get_all_states()
411 states2 = episode2.get_all_states()
413 for state, state2 in utils.iterate_together([states1, states2], key):
415 if state == state2:
416 continue
418 if state == None:
419 _move(state2=state2, podcast_id=episode.podcast,
420 episode_id=episode._id)
422 elif state2 == None:
423 continue
425 else:
426 merge_episode_states(state, state2)
430 def merge_episode_states(state, state2):
431 """ Merges state2 in state """
433 if state._id == state2._id:
434 raise IncorrectMergeException("can't merge episode state into itself")
436 if state.user != state2.user:
437 raise IncorrectMergeException("states don't belong to the same user")
440 @repeat_on_conflict(['state'])
441 def _do_update(state, state2):
442 state.add_actions(state2.actions)
444 # overwrite settings in state2 with state's settings
445 settings = state2.settings
446 settings.update(state.settings)
447 state.settings = settings
449 merged_ids = set(state.merged_ids + [state2._id] + state2.merged_ids)
450 state.merged_ids = filter(None, merged_ids)
452 state.chapters = list(set(state.chapters + state2.chapters))
454 state.save()
456 @repeat_on_conflict(['state2'])
457 def _do_delete(state2):
458 state2.delete()
460 _do_update(state=state, state2=state2)
461 _do_delete(state2=state2)
464 def set_filter(*args):
465 return filter(None, set(chain.from_iterable(args)))