2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
19 from datetime
import datetime
21 from collections
import Counter
23 from django
.test
import TestCase
, TransactionTestCase
24 from django
.test
.utils
import override_settings
25 from django
.contrib
.auth
import get_user_model
27 from mygpo
.podcasts
.models
import Podcast
, Episode
28 from mygpo
.history
.models
import EpisodeHistoryEntry
29 from mygpo
.maintenance
.merge
import PodcastMerger
36 class SimpleMergeTests(TestCase
):
39 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
40 'http://example.com/simple-merge-test-feed.rss',
41 defaults
={'title': 'Podcast 1'},
43 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
44 'http://simple-merge-test.org/podcast/',
45 defaults
={'title': 'Podcast 2'},
48 self
.episode1
= Episode
.objects
.get_or_create_for_url(
49 self
.podcast1
, 'http://example.com/simple-merge-test-episode1.mp3',
51 'title': 'Episode 1 A',
53 self
.episode2
= Episode
.objects
.get_or_create_for_url(
54 self
.podcast2
, 'http://example.com/simple-merge-test-episode1.mp3',
56 'title': 'Episode 1 B',
59 def test_merge_podcasts(self
):
60 merge(self
.podcast1
, self
.podcast2
)
63 @override_settings(CACHE
={})
64 class MergeTests(TransactionTestCase
):
65 """ Tests merging of two podcasts, their episodes and states """
68 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
69 'http://example.com/merge-test-feed.rss',
70 defaults
={'title': 'Podcast 1'},
72 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
73 'http://merge-test.org/podcast/',
74 defaults
={'title': 'Podcast 2'},
77 self
.episode1
= Episode
.objects
.get_or_create_for_url(
78 self
.podcast1
, 'http://example.com/merge-test-episode1.mp3',
80 'title': 'Episode 1 A',
82 self
.episode2
= Episode
.objects
.get_or_create_for_url(
83 self
.podcast2
, 'http://example.com/merge-test-episode1.mp3',
85 'title': 'Episode 1 B',
88 User
= get_user_model()
89 self
.user
= User(username
='test-merge')
90 self
.user
.email
= 'test-merge-tests@example.com'
91 self
.user
.set_password('secret!')
94 def test_merge_podcasts(self
):
96 action1
= EpisodeHistoryEntry
.objects
.create(
97 timestamp
=datetime
.utcnow(),
98 episode
=self
.episode1
,
101 action
=EpisodeHistoryEntry
.PLAY
,
102 podcast_ref_url
=None,
103 episode_ref_url
=None,
106 action2
= EpisodeHistoryEntry
.objects
.create(
107 timestamp
=datetime
.utcnow(),
108 episode
=self
.episode2
,
111 action
=EpisodeHistoryEntry
.DOWNLOAD
,
112 podcast_ref_url
=None,
113 episode_ref_url
=None,
116 # decide which episodes to merge
117 groups
= [(0, [self
.episode1
, self
.episode2
])]
120 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
123 history
= EpisodeHistoryEntry
.objects
.filter(
124 episode
=self
.episode1
,
128 # both actions must be present for the merged episode
129 self
.assertIn(action1
, history
)
130 self
.assertIn(action2
, history
)
133 self
.episode1
.delete()
134 self
.podcast1
.delete()
138 class MergeGroupTests(TransactionTestCase
):
139 """ Tests merging of two podcasts, one of which is part of a group """
142 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
143 'http://example.com/group-merge-feed.rss',
145 'title': 'Podcast 1',
148 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
149 'http://test.org/group-merge-podcast/',
151 'title': 'Podcast 2',
154 self
.podcast3
= Podcast
.objects
.get_or_create_for_url(
155 'http://group-test.org/feed/',
157 'title': 'Podcast 3',
161 self
.episode1
= Episode
.objects
.get_or_create_for_url(
162 self
.podcast1
, 'http://example.com/group-merge-episode1.mp3',
164 'title': 'Episode 1 A',
167 self
.episode2
= Episode
.objects
.get_or_create_for_url(
168 self
.podcast2
, 'http://example.com/group-merge-episode1.mp3',
170 'title': 'Episode 1 B',
173 self
.episode3
= Episode
.objects
.get_or_create_for_url(
174 self
.podcast3
, 'http://example.com/group-merge-media.mp3',
176 'title': 'Episode 2',
180 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
182 User
= get_user_model()
183 self
.user
= User(username
='test-merge-group')
184 self
.user
.email
= 'test-merge-group-tests@example.com'
185 self
.user
.set_password('secret!')
188 def test_merge_podcasts(self
):
189 podcast1
= Podcast
.objects
.get(pk
=self
.podcast1
.pk
)
190 podcast2
= Podcast
.objects
.get(pk
=self
.podcast2
.pk
)
191 podcast3
= Podcast
.objects
.get(pk
=self
.podcast3
.pk
)
193 # assert that the podcasts are actually grouped
194 self
.assertEqual(podcast2
.group
, podcast3
.group
)
196 action1
= EpisodeHistoryEntry
.objects
.create(
197 timestamp
=datetime
.utcnow(),
198 episode
=self
.episode1
,
201 action
=EpisodeHistoryEntry
.PLAY
,
202 podcast_ref_url
=None,
203 episode_ref_url
=None,
206 action2
= EpisodeHistoryEntry
.objects
.create(
207 timestamp
=datetime
.utcnow(),
208 episode
=self
.episode2
,
211 action
=EpisodeHistoryEntry
.DOWNLOAD
,
212 podcast_ref_url
=None,
213 episode_ref_url
=None,
216 # decide which episodes to merge
217 groups
= [(0, [self
.episode1
, self
.episode2
])]
220 episode2_id
= self
.episode2
.id
222 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
225 history
= EpisodeHistoryEntry
.objects
.filter(
226 episode
= self
.episode1
,
230 self
.assertIn(action1
, history
)
231 self
.assertIn(action2
, history
)
233 episode1
= Episode
.objects
.get(pk
=self
.episode1
.pk
)
235 # episode2 has been merged into episode1, so it must contain its
237 self
.assertEqual([x
.uuid
for x
in episode1
.merged_uuids
.all()],
241 self
.episode1
.delete()
242 self
.podcast2
.delete()
247 suite
= unittest
.TestSuite()
248 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
249 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))