1 from itertools
import chain
, imap
as map
3 from functools
import partial
7 from mygpo
.core
.models
import Podcast
, Episode
, PodcastGroup
8 from mygpo
.users
.models
import PodcastUserState
, EpisodeUserState
9 from mygpo
import utils
10 from mygpo
.decorators
import repeat_on_conflict
11 from mygpo
.db
.couchdb
.episode
import episodes_for_podcast
12 from mygpo
.db
.couchdb
.podcast_state
import all_podcast_states
13 from mygpo
.db
.couchdb
.episode_state
import all_episode_states
14 from mygpo
.db
.couchdb
.utils
import multi_request_view
17 class IncorrectMergeException(Exception):
21 def podcast_url_wrapper(r
):
24 if doc
['doc_type'] == 'Podcast':
25 obj
= Podcast
.wrap(doc
)
26 elif doc
['doc_type'] == 'PodcastGroup':
27 obj
= PodcastGroup
.wrap(doc
)
29 return obj
.get_podcast_by_url(url
)
32 def merge_objects(podcasts
=True, podcast_states
=False, episodes
=False,
33 episode_states
=False, dry_run
=False):
35 Merges objects (podcasts, episodes, states) based on different criteria
38 # The "smaller" podcast is merged into the "greater"
39 podcast_merge_order
= lambda a
, b
: cmp(a
.subscriber_count(), b
.subscriber_count())
40 no_merge_order
= lambda a
, b
: 0
42 merger
= partial(merge_from_iterator
, dry_run
=dry_run
,
43 progress_callback
=utils
.progress
)
48 print 'Merging Podcasts by URL'
49 podcasts
, total
= get_view_count_iter(Podcast
,
53 podcasts
= map(podcast_url_wrapper
, podcasts
)
54 merger(podcasts
, similar_urls
, podcast_merge_order
, total
,
59 print 'Merging Podcasts by Old-Id'
60 podcasts
, total
= get_view_count_iter(Podcast
,
64 podcasts
= imap(podcast_oldid_wrapper
, podcasts
)
65 merger(podcasts
, similar_oldid
, podcast_merge_order
, total
,
71 print 'Merging Duplicate Podcast States'
72 states
, total
= get_view_count_iter(PodcastUserState
,
73 'podcast_states/by_user',
75 should_merge
= lambda a
, b
: a
== b
76 merger(states
, should_merge
, no_merge_order
, total
,
82 print 'Merging Episodes by URL'
83 episodes
, total
= get_view_count_iter(Episode
,
84 'episodes/by_podcast_url',
86 should_merge
= lambda a
, b
: a
.podcast
== b
.podcast
and \
88 merger(episodes
, should_merge
, no_merge_order
, total
, merge_episodes
)
92 print 'Merging Episodes by Old-Id'
93 episodes
, total
= get_view_count_iter(Episode
,
96 should_merge
= lambda a
, b
: a
.podcast
== b
.podcast
and \
98 merger(episodes
, should_merge
, no_merge_order
, total
, merge_episodes
)
103 print 'Merging Duplicate Episode States'
104 states
, total
= get_view_count_iter(EpisodeUserState
,
105 'episode_states/by_user_episode',
107 should_merge
= lambda a
, b
: (a
.user
, a
.episode
) == \
109 merger(states
, should_merge
, no_merge_order
, total
,
110 merge_episode_states
)
115 def get_view_count_iter(cls
, view
, *args
, **kwargs
):
116 iterator
= multi_request_view(cls
, view
, *args
, **kwargs
)
117 total
= cls
.view(view
, limit
=0).total_rows
118 return iterator
, total
121 def merge_from_iterator(obj_it
, should_merge
, cmp, total
, merge_func
,
122 dry_run
=False, progress_callback
=lambda *args
, **kwargs
: None):
124 Iterates over the objects in obj_it and calls should_merge for each pair of
125 objects. This implies that the objects returned by obj_it should be sorted
126 such that potential merge-candiates appear after each other.
128 If should_merge returns True, the pair of objects is going to be merged.
129 The smaller object (according to cmp) is merged into the larger one.
130 merge_func performs the actual merge. It is passed the two objects to be
131 merged (first the larger, then the smaller one).
134 obj_it
= iter(obj_it
)
138 except StopIteration:
141 for n
, p
in enumerate(obj_it
):
142 if should_merge(p
, prev
):
143 items
= sorted([p
, prev
], cmp=cmp)
144 logging
.info('merging {old} into {new}'.
145 format(old
=items
[1], new
=items
[0]))
147 merge_func(*items
, dry_run
=dry_run
)
150 progress_callback(n
, total
)
154 class PodcastMerger(object):
155 """ Merges podcast2 into podcast
157 Also merges the related podcast states, and re-assignes podcast2's episodes
158 to podcast, but does neither merge their episodes nor their episode states
162 def __init__(self
, podcasts
, actions
, groups
, dry_run
=False):
164 for n
, podcast1
in enumerate(podcasts
):
165 for m
, podcast2
in enumerate(podcasts
):
166 if podcast1
== podcast2
and n
!= m
:
167 raise IncorrectMergeException("can't merge podcast into itself")
169 self
.podcasts
= podcasts
170 self
.actions
= actions
172 self
.dry_run
= dry_run
176 podcast1
= self
.podcasts
.pop(0)
178 for podcast2
in self
.podcasts
:
179 self
._merge
_objs
(podcast1
=podcast1
, podcast2
=podcast2
)
180 self
.merge_states(podcast1
, podcast2
)
181 self
.merge_episodes()
182 self
.reassign_episodes(podcast1
, podcast2
)
183 self
._delete
(podcast2
=podcast2
)
185 self
.actions
['merge-podcast'] += 1
188 def merge_episodes(self
):
189 for n
, episodes
in self
.groups
:
191 episode
= episodes
.pop(0)
195 em
= EpisodeMerger(episode
, ep
, self
.actions
)
199 @repeat_on_conflict(['podcast1', 'podcast2'])
200 def _merge_objs(self
, podcast1
, podcast2
):
202 podcast1
.merged_ids
= set_filter(podcast1
.get_id(),
203 podcast1
.merged_ids
, [podcast2
.get_id()], podcast2
.merged_ids
)
205 podcast1
.merged_slugs
= set_filter(podcast1
.slug
,
206 podcast1
.merged_slugs
, [podcast2
.slug
], podcast2
.merged_slugs
)
208 podcast1
.merged_oldids
= set_filter(podcast1
.oldid
,
209 podcast1
.merged_oldids
, [podcast2
.oldid
],
210 podcast2
.merged_oldids
)
212 # the first URL in the list represents the podcast main URL
213 main_url
= podcast1
.url
214 podcast1
.urls
= set_filter(None, podcast1
.urls
, podcast2
.urls
)
215 # so we insert it as the first again
216 podcast1
.urls
.remove(main_url
)
217 podcast1
.urls
.insert(0, main_url
)
219 # we ignore related_podcasts because
220 # * the elements should be roughly the same
221 # * element order is important but could not preserved exactly
223 podcast1
.content_types
= set_filter(None, podcast1
.content_types
,
224 podcast2
.content_types
)
226 key
= lambda x
: x
.timestamp
227 for a
, b
in utils
.iterate_together(
228 [podcast1
.subscribers
, podcast2
.subscribers
], key
):
230 if a
is None or b
is None:
233 # avoid increasing subscriber_count when merging
234 # duplicate entries of a single podcast
235 if a
.subscriber_count
== b
.subscriber_count
:
238 a
.subscriber_count
+= b
.subscriber_count
240 for src
, tags
in podcast2
.tags
.items():
241 podcast1
.tags
[src
] = set_filter(None, podcast1
.tags
.get(src
, []),
247 @repeat_on_conflict(['podcast2'])
248 def _delete(self
, podcast2
):
252 @repeat_on_conflict(['s'])
253 def _save_state(self
, s
, podcast1
):
254 s
.podcast
= podcast1
.get_id()
258 @repeat_on_conflict(['e'])
259 def _save_episode(self
, e
, podcast1
):
260 e
.podcast
= podcast1
.get_id()
265 def reassign_episodes(self
, podcast1
, podcast2
):
266 # re-assign episodes to new podcast
267 # if necessary, they will be merged later anyway
268 for e
in episodes_for_podcast(podcast2
):
269 self
.actions
['reassign-episode'] += 1
271 for s
in all_episode_states(e
):
272 self
.actions
['reassign-episode-state'] += 1
274 self
._save
_state
(s
=s
, podcast1
=podcast1
)
276 self
._save
_episode
(e
=e
, podcast1
=podcast1
)
279 def merge_states(self
, podcast1
, podcast2
):
280 """Merges the Podcast states that are associated with the two Podcasts.
282 This should be done after two podcasts are merged
285 key
= lambda x
: x
.user
286 states1
= sorted(all_podcast_states(podcast1
), key
=key
)
287 states2
= sorted(all_podcast_states(podcast2
), key
=key
)
289 for state
, state2
in utils
.iterate_together([states1
, states2
], key
):
295 self
.actions
['move-podcast-state'] += 1
296 self
._move
_state
(state2
=state2
, new_id
=podcast1
.get_id(),
297 new_url
=podcast1
.url
)
303 psm
= PodcastStateMerger(state
, state2
, self
.actions
)
307 @repeat_on_conflict(['state2'])
308 def _move_state(self
, state2
, new_id
, new_url
):
309 state2
.ref_url
= new_url
310 state2
.podcast
= new_id
313 @repeat_on_conflict(['state2'])
314 def _delete_state(state2
):
320 def similar_urls(a
, b
):
321 """ Two Podcasts/Episodes are merged, if they have the same URLs"""
322 return bool(utils
.intersect(a
.urls
, b
.urls
))
329 class EpisodeMerger(object):
332 def __init__(self
, episode1
, episode2
, actions
, dry_run
=False):
333 if episode1
== episode2
:
334 raise IncorrectMergeException("can't merge episode into itself")
336 self
.episode1
= episode1
337 self
.episode2
= episode2
338 self
.actions
= actions
339 self
.dry_run
= dry_run
343 self
._merge
_objs
(episode1
=self
.episode1
, episode2
=self
.episode2
)
344 self
.merge_states(self
.episode1
, self
.episode2
)
345 self
._delete
(e
=self
.episode2
)
346 self
.actions
['merge-episode'] += 1
349 @repeat_on_conflict(['episode1'])
350 def _merge_objs(self
, episode1
, episode2
):
352 episode1
.urls
= set_filter(None, episode1
.urls
, episode2
.urls
)
354 episode1
.merged_ids
= set_filter(episode1
._id
, episode1
.merged_ids
,
355 [episode2
._id
], episode2
.merged_ids
)
357 episode1
.merged_slugs
= set_filter(episode1
.slug
,
358 episode1
.merged_slugs
, [episode2
.slug
], episode2
.merged_slugs
)
363 @repeat_on_conflict(['e'])
364 def _delete(self
, e
):
368 def merge_states(self
, episode
, episode2
):
370 key
= lambda x
: x
.user
371 states1
= sorted(all_episode_states(self
.episode1
), key
=key
)
372 states2
= sorted(all_episode_states(self
.episode2
), key
=key
)
374 for state
, state2
in utils
.iterate_together([states1
, states2
], key
):
380 self
.actions
['move-episode-state'] += 1
381 self
._move
(state2
=state2
, podcast_id
=self
.episode1
.podcast
,
382 episode_id
=self
.episode1
._id
)
388 esm
= EpisodeStateMerger(state
, state2
, self
.actions
)
392 @repeat_on_conflict(['state2'])
393 def _move(self
, state2
, podcast_id
, episode_id
):
394 state2
.podcast
= podcast_id
395 state2
.episode
= episode_id
403 class PodcastStateMerger(object):
404 """Merges the two given podcast states"""
406 def __init__(self
, state
, state2
, actions
, dry_run
=False):
408 if state
._id
== state2
._id
:
409 raise IncorrectMergeException("can't merge podcast state into itself")
411 if state
.user
!= state2
.user
:
412 raise IncorrectMergeException("states don't belong to the same user")
416 self
.actions
= actions
417 self
.dry_run
= dry_run
421 self
._do
_merge
(state
=self
.state
, state2
=self
.state2
)
422 self
._add
_actions
(state
=self
.state
, actions
=self
.state2
.actions
)
423 self
._delete
(state2
=self
.state2
)
424 self
.actions
['merged-podcast-state'] += 1
427 @repeat_on_conflict(['state'])
428 def _do_merge(self
, state
, state2
):
430 # overwrite settings in state2 with state's settings
431 settings
= state2
.settings
432 settings
.update(state
.settings
)
433 state
.settings
= settings
435 state
.disabled_devices
= set_filter(None, state
.disabled_devices
,
436 state2
.disabled_devices
)
438 state
.merged_ids
= set_filter(state1
._id
, state
.merged_ids
,
439 [state2
._id
], state2
.merged_ids
)
441 state
.tags
= set_filter(None, state
.tags
, state2
.tags
)
446 @repeat_on_conflict(['state'])
447 def _add_actions(self
, state
, actions
):
449 state
.add_actions(actions
)
451 except restkit
.Unauthorized
:
452 # the merge could result in an invalid list of
453 # subscribe/unsubscribe actions -- we ignore it and
454 # just use the actions from state
457 @repeat_on_conflict(['state2'])
458 def _delete(self
, state2
):
465 class EpisodeStateMerger(object):
466 """ Merges state2 in state """
468 def __init__(self
, state
, state2
, actions
, dry_run
=False):
470 if state
._id
== state2
._id
:
471 raise IncorrectMergeException("can't merge episode state into itself")
473 if state
.user
!= state2
.user
:
474 raise IncorrectMergeException("states don't belong to the same user")
478 self
.actions
= actions
479 self
.dry_run
= dry_run
483 self
._merge
_obj
(state
=self
.state
, state2
=self
.state2
)
484 self
._do
_delete
(state2
=self
.state2
)
485 self
.actions
['merge-episode-state'] += 1
488 @repeat_on_conflict(['state'])
489 def _merge_obj(self
, state
, state2
):
490 state
.add_actions(state2
.actions
)
492 # overwrite settings in state2 with state's settings
493 settings
= state2
.settings
494 settings
.update(state
.settings
)
495 state
.settings
= settings
497 merged_ids
= set(state
.merged_ids
+ [state2
._id
] + state2
.merged_ids
)
498 state
.merged_ids
= filter(None, merged_ids
)
500 state
.chapters
= list(set(state
.chapters
+ state2
.chapters
))
504 @repeat_on_conflict(['state2'])
505 def _do_delete(self
, state2
):
509 def set_filter(orig
, *args
):
510 """ chain args, and remove falsy values and orig """
511 s
= set(chain
.from_iterable(args
))