1 from itertools
import chain
, imap
3 from functools
import partial
7 from mygpo
.core
.models
import Podcast
, Episode
, PodcastGroup
8 from mygpo
.users
.models
import PodcastUserState
, EpisodeUserState
9 from mygpo
import utils
10 from mygpo
.decorators
import repeat_on_conflict
13 class IncorrectMergeException(Exception):
17 def podcast_url_wrapper(r
):
20 if doc
['doc_type'] == 'Podcast':
21 obj
= Podcast
.wrap(doc
)
22 elif doc
['doc_type'] == 'PodcastGroup':
23 obj
= PodcastGroup
.wrap(doc
)
25 return obj
.get_podcast_by_url(url
)
27 def podcast_oldid_wrapper(r
):
30 if doc
['doc_type'] == 'Podcast':
31 obj
= Podcast
.wrap(doc
)
32 elif doc
['doc_type'] == 'PodcastGroup':
33 obj
= PodcastGroup
.wrap(doc
)
35 return obj
.get_podcast_by_oldid(oldid
)
38 def merge_objects(podcasts
=True, podcast_states
=False, episodes
=False,
39 episode_states
=False, dry_run
=False):
41 Merges objects (podcasts, episodes, states) based on different criteria
44 # The "smaller" podcast is merged into the "greater"
45 podcast_merge_order
= lambda a
, b
: cmp(a
.subscriber_count(), b
.subscriber_count())
46 no_merge_order
= lambda a
, b
: 0
48 merger
= partial(merge_from_iterator
, dry_run
=dry_run
,
49 progress_callback
=utils
.progress
)
54 print 'Merging Podcasts by URL'
55 podcasts
, total
= get_view_count_iter(Podcast
,
59 podcasts
= imap(podcast_url_wrapper
, podcasts
)
60 merger(podcasts
, similar_urls
, podcast_merge_order
, total
,
65 print 'Merging Podcasts by Old-Id'
66 podcasts
, total
= get_view_count_iter(Podcast
,
70 podcasts
= imap(podcast_oldid_wrapper
, podcasts
)
71 merger(podcasts
, similar_oldid
, podcast_merge_order
, total
,
77 print 'Merging Duplicate Podcast States'
78 states
, total
= get_view_count_iter(PodcastUserState
,
79 'podcast_states/by_user',
81 should_merge
= lambda a
, b
: a
== b
82 merger(states
, should_merge
, no_merge_order
, total
,
88 print 'Merging Episodes by URL'
89 episodes
, total
= get_view_count_iter(Episode
,
90 'episodes/by_podcast_url',
92 should_merge
= lambda a
, b
: a
.podcast
== b
.podcast
and \
94 merger(episodes
, should_merge
, no_merge_order
, total
, merge_episodes
)
98 print 'Merging Episodes by Old-Id'
99 episodes
, total
= get_view_count_iter(Episode
,
102 should_merge
= lambda a
, b
: a
.podcast
== b
.podcast
and \
104 merger(episodes
, should_merge
, no_merge_order
, total
, merge_episodes
)
109 print 'Merging Duplicate Episode States'
110 states
, total
= get_view_count_iter(EpisodeUserState
,
111 'episode_states/by_user_episode',
113 should_merge
= lambda a
, b
: (a
.user
, a
.episode
) == \
115 merger(states
, should_merge
, no_merge_order
, total
,
116 merge_episode_states
)
121 def get_view_count_iter(cls
, view
, *args
, **kwargs
):
122 iterator
= utils
.multi_request_view(cls
, view
, *args
, **kwargs
)
123 total
= cls
.view(view
, limit
=0).total_rows
124 return iterator
, total
127 def merge_from_iterator(obj_it
, should_merge
, cmp, total
, merge_func
,
128 dry_run
=False, progress_callback
=lambda *args
, **kwargs
: None):
130 Iterates over the objects in obj_it and calls should_merge for each pair of
131 objects. This implies that the objects returned by obj_it should be sorted
132 such that potential merge-candiates appear after each other.
134 If should_merge returns True, the pair of objects is going to be merged.
135 The smaller object (according to cmp) is merged into the larger one.
136 merge_func performs the actual merge. It is passed the two objects to be
137 merged (first the larger, then the smaller one).
140 obj_it
= iter(obj_it
)
144 except StopIteration:
147 for n
, p
in enumerate(obj_it
):
148 if should_merge(p
, prev
):
149 items
= sorted([p
, prev
], cmp=cmp)
150 logging
.info('merging {old} into {new}'.
151 format(old
=items
[1], new
=items
[0]))
153 merge_func(*items
, dry_run
=dry_run
)
156 progress_callback(n
, total
)
165 def merge_podcasts(podcast
, podcast2
, dry_run
=False):
167 Merges podcast2 into podcast
170 if podcast
== podcast2
:
171 raise IncorrectMergeException("can't merge podcast into itself")
173 @repeat_on_conflict(['podcast'], reload_f
=lambda p
:Podcast
.get(p
.get_id()))
174 def _do_merge(podcast
, podcast2
):
176 podcast
.merged_ids
= set_filter(podcast
.merged_ids
,
177 [podcast2
.get_id()], podcast2
.merged_ids
)
179 podcast
.merged_slugs
= set_filter(podcast
.merged_slugs
,
180 [podcast2
.slug
], podcast2
.merged_slugs
)
182 podcast
.merged_oldids
= set_filter(podcast
.merged_oldids
,
183 [podcast2
.oldid
], podcast2
.merged_oldids
)
185 # the first URL in the list represents the podcast main URL
186 main_url
= podcast
.url
187 podcast
.urls
= set_filter(podcast
.urls
, podcast2
.urls
)
188 # so we insert it as the first again
189 podcast
.urls
.remove(main_url
)
190 podcast
.urls
.insert(0, main_url
)
192 # we ignore related_podcasts because
193 # * the elements should be roughly the same
194 # * element order is important but could not preserved exactly
196 podcast
.content_types
= set_filter(podcast
.content_types
,
197 podcast2
.content_types
)
199 key
= lambda x
: x
.timestamp
200 for a
, b
in utils
.iterate_together(
201 [podcast
.subscribers
, podcast2
.subscribers
], key
):
203 if a
is None or b
is None: continue
205 # avoid increasing subscriber_count when merging
206 # duplicate entries of a single podcast
207 if a
.subscriber_count
== b
.subscriber_count
:
210 a
.subscriber_count
+= b
.subscriber_count
212 for src
, tags
in podcast2
.tags
.items():
213 podcast
.tags
[src
] = set_filter(podcast
.tags
.get(src
, []), tags
)
218 @repeat_on_conflict(['podcast2'])
219 def _do_delete(podcast2
):
223 # re-assign episodes to new podcast
224 # if necessary, they will be merged later anyway
225 for e
in podcast2
.get_episodes():
227 @repeat_on_conflict(['s'])
229 s
.podcast
= podcast
.get_id()
233 @repeat_on_conflict(['e'])
235 e
.podcast
= podcast
.get_id()
238 for s
in e
.get_all_states():
244 _do_merge(podcast
=podcast
, podcast2
=podcast2
)
245 merge_podcast_states_for_podcasts(podcast
, podcast2
, dry_run
=dry_run
)
246 _do_delete(podcast2
=podcast2
)
248 # Merge Episode States
249 no_merge_order
= lambda a
, b
: 0
250 episodes
= sorted(podcast
.get_episodes(), key
=lambda e
: e
.url
)
251 should_merge
= lambda a
, b
: a
.podcast
== b
.podcast
and similar_urls(a
, b
)
253 merge_from_iterator(episodes
, should_merge
, no_merge_order
, len(episodes
),
257 def similar_urls(a
, b
):
258 """ Two Podcasts/Episodes are merged, if they have the same URLs"""
259 return bool(utils
.intersect(a
.urls
, b
.urls
))
262 def similar_oldid(o1
, o2
):
263 """ Two Podcasts/Episodes are merged, if they have the same Old-IDs"""
264 return o1
.oldid
== o2
.oldid
and o1
.oldid
is not None
274 def merge_episodes(episode
, e
, dry_run
=False):
277 raise IncorrectMergeException("can't merge episode into itself")
279 episode
.urls
= set_filter(episode
.urls
, e
.urls
)
281 episode
.merged_ids
= set_filter(episode
.merged_ids
, [e
._id
],
284 episode
.merged_slugs
= set_filter(episode
.merged_slugs
, [e
.slug
],
287 @repeat_on_conflict(['e'])
291 @repeat_on_conflict(['episode'])
295 merge_episode_states_for_episodes(episode
, e
, dry_run
)
297 save(episode
=episode
)
304 # MERGING PODCAST STATES
308 def merge_podcast_states_for_podcasts(podcast
, podcast2
, dry_run
=False):
309 """Merges the Podcast states that are associated with the two Podcasts.
311 This should be done after two podcasts are merged
314 @repeat_on_conflict(['state2'])
315 def move(state2
, new_id
, new_url
):
316 state2
.ref_url
= new_url
317 state2
.podcast
= new_id
320 @repeat_on_conflict(['state2'])
324 key
= lambda x
: x
.user
325 states1
= podcast
.get_all_states()
326 states2
= podcast2
.get_all_states()
328 for state
, state2
in utils
.iterate_together([states1
, states2
], key
):
334 _move(state2
=state2
, new_id
=podcast
.get_id(), new_url
=podcast
.url
)
340 merge_podcast_states(state
, state2
)
341 delete(state2
=state2
)
344 def merge_podcast_states(state
, state2
):
345 """Merges the two given podcast states"""
347 if state
._id
== state2
._id
:
348 raise IncorrectMergeException("can't merge podcast state into itself")
350 if state
.user
!= state2
.user
:
351 raise IncorrectMergeException("states don't belong to the same user")
353 @repeat_on_conflict(['state'])
354 def _do_merge(state
, state2
):
356 # overwrite settings in state2 with state's settings
357 settings
= state2
.settings
358 settings
.update(state
.settings
)
359 state
.settings
= settings
361 state
.disabled_devices
= set_filter(state
.disabled_devices
,
362 state2
.disabled_devices
)
364 state
.merged_ids
= set_filter(state
.merged_ids
, [state2
._id
],
367 state
.tags
= set_filter(state
.tags
, state2
.tags
)
372 @repeat_on_conflict(['state'])
373 def _add_actions(state
, actions
):
375 state
.add_actions(actions
)
377 except restkit
.Unauthorized
:
378 # the merge could result in an invalid list of
379 # subscribe/unsubscribe actions -- we ignore it and
380 # just use the actions from state
383 @repeat_on_conflict(['state2'])
384 def _do_delete(state2
):
387 _do_merge(state
=state
, state2
=state2
)
389 _add_actions(state
=state
, actions
=state2
.actions
)
391 _do_delete(state2
=state2
)
396 # MERGING EPISODE STATES
401 def merge_episode_states_for_episodes(episode
, episode2
, dry_run
=False):
403 @repeat_on_conflict(['state2'])
404 def move(state2
, podcast_id
, episode_id
):
405 state2
.podcast
= podcast_id
406 state2
.episode
= episode_id
409 key
= lambda x
: x
.user
410 states1
= episode
.get_all_states()
411 states2
= episode2
.get_all_states()
413 for state
, state2
in utils
.iterate_together([states1
, states2
], key
):
419 _move(state2
=state2
, podcast_id
=episode
.podcast
,
420 episode_id
=episode
._id
)
426 merge_episode_states(state
, state2
)
430 def merge_episode_states(state
, state2
):
431 """ Merges state2 in state """
433 if state
._id
== state2
._id
:
434 raise IncorrectMergeException("can't merge episode state into itself")
436 if state
.user
!= state2
.user
:
437 raise IncorrectMergeException("states don't belong to the same user")
440 @repeat_on_conflict(['state'])
441 def _do_update(state
, state2
):
442 state
.add_actions(state2
.actions
)
444 # overwrite settings in state2 with state's settings
445 settings
= state2
.settings
446 settings
.update(state
.settings
)
447 state
.settings
= settings
449 merged_ids
= set(state
.merged_ids
+ [state2
._id
] + state2
.merged_ids
)
450 state
.merged_ids
= filter(None, merged_ids
)
452 state
.chapters
= list(set(state
.chapters
+ state2
.chapters
))
456 @repeat_on_conflict(['state2'])
457 def _do_delete(state2
):
460 _do_update(state
=state
, state2
=state2
)
461 _do_delete(state2
=state2
)
464 def set_filter(*args
):
465 return filter(None, set(chain
.from_iterable(args
)))