2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
19 from datetime
import datetime
21 from collections
import Counter
23 from django
.test
import TestCase
, TransactionTestCase
24 from django
.test
.utils
import override_settings
25 from django
.contrib
.auth
import get_user_model
27 from mygpo
.podcasts
.models
import Podcast
, Episode
28 from mygpo
.history
.models
import EpisodeHistoryEntry
29 from mygpo
.maintenance
.merge
import PodcastMerger
36 class SimpleMergeTests(TestCase
):
39 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
40 'http://example.com/simple-merge-test-feed.rss',
41 defaults
={'title': 'Podcast 1'},
43 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
44 'http://simple-merge-test.org/podcast/',
45 defaults
={'title': 'Podcast 2'},
48 self
.episode1
= Episode
.objects
.get_or_create_for_url(
49 self
.podcast1
, 'http://example.com/simple-merge-test-episode1.mp3',
51 'title': 'Episode 1 A',
53 self
.episode2
= Episode
.objects
.get_or_create_for_url(
54 self
.podcast2
, 'http://example.com/simple-merge-test-episode1.mp3',
56 'title': 'Episode 1 B',
59 def test_merge_podcasts(self
):
60 # decide which episodes to merge
61 groups
= [(0, [self
.episode1
, self
.episode2
])]
63 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
67 @override_settings(CACHE
={})
68 class MergeTests(TransactionTestCase
):
69 """ Tests merging of two podcasts, their episodes and states """
72 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
73 'http://example.com/merge-test-feed.rss',
74 defaults
={'title': 'Podcast 1'},
76 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
77 'http://merge-test.org/podcast/',
78 defaults
={'title': 'Podcast 2'},
81 self
.episode1
= Episode
.objects
.get_or_create_for_url(
82 self
.podcast1
, 'http://example.com/merge-test-episode1.mp3',
84 'title': 'Episode 1 A',
86 self
.episode2
= Episode
.objects
.get_or_create_for_url(
87 self
.podcast2
, 'http://example.com/merge-test-episode1.mp3',
89 'title': 'Episode 1 B',
92 User
= get_user_model()
93 self
.user
= User(username
='test-merge')
94 self
.user
.email
= 'test-merge-tests@example.com'
95 self
.user
.set_password('secret!')
98 def test_merge_podcasts(self
):
100 action1
= EpisodeHistoryEntry
.objects
.create(
101 timestamp
=datetime
.utcnow(),
102 episode
=self
.episode1
,
105 action
=EpisodeHistoryEntry
.PLAY
,
106 podcast_ref_url
=None,
107 episode_ref_url
=None,
110 action2
= EpisodeHistoryEntry
.objects
.create(
111 timestamp
=datetime
.utcnow(),
112 episode
=self
.episode2
,
115 action
=EpisodeHistoryEntry
.DOWNLOAD
,
116 podcast_ref_url
=None,
117 episode_ref_url
=None,
120 # decide which episodes to merge
121 groups
= [(0, [self
.episode1
, self
.episode2
])]
124 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
127 history
= EpisodeHistoryEntry
.objects
.filter(
128 episode
=self
.episode1
,
132 # both actions must be present for the merged episode
133 self
.assertIn(action1
, history
)
134 self
.assertIn(action2
, history
)
137 self
.episode1
.delete()
138 self
.podcast1
.delete()
142 class MergeGroupTests(TransactionTestCase
):
143 """ Tests merging of two podcasts, one of which is part of a group """
146 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
147 'http://example.com/group-merge-feed.rss',
149 'title': 'Podcast 1',
152 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
153 'http://test.org/group-merge-podcast/',
155 'title': 'Podcast 2',
158 self
.podcast3
= Podcast
.objects
.get_or_create_for_url(
159 'http://group-test.org/feed/',
161 'title': 'Podcast 3',
165 self
.episode1
= Episode
.objects
.get_or_create_for_url(
166 self
.podcast1
, 'http://example.com/group-merge-episode1.mp3',
168 'title': 'Episode 1 A',
171 self
.episode2
= Episode
.objects
.get_or_create_for_url(
172 self
.podcast2
, 'http://example.com/group-merge-episode1.mp3',
174 'title': 'Episode 1 B',
177 self
.episode3
= Episode
.objects
.get_or_create_for_url(
178 self
.podcast3
, 'http://example.com/group-merge-media.mp3',
180 'title': 'Episode 2',
184 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
186 User
= get_user_model()
187 self
.user
= User(username
='test-merge-group')
188 self
.user
.email
= 'test-merge-group-tests@example.com'
189 self
.user
.set_password('secret!')
192 def test_merge_podcasts(self
):
193 podcast1
= Podcast
.objects
.get(pk
=self
.podcast1
.pk
)
194 podcast2
= Podcast
.objects
.get(pk
=self
.podcast2
.pk
)
195 podcast3
= Podcast
.objects
.get(pk
=self
.podcast3
.pk
)
197 # assert that the podcasts are actually grouped
198 self
.assertEqual(podcast2
.group
, podcast3
.group
)
200 action1
= EpisodeHistoryEntry
.objects
.create(
201 timestamp
=datetime
.utcnow(),
202 episode
=self
.episode1
,
205 action
=EpisodeHistoryEntry
.PLAY
,
206 podcast_ref_url
=None,
207 episode_ref_url
=None,
210 action2
= EpisodeHistoryEntry
.objects
.create(
211 timestamp
=datetime
.utcnow(),
212 episode
=self
.episode2
,
215 action
=EpisodeHistoryEntry
.DOWNLOAD
,
216 podcast_ref_url
=None,
217 episode_ref_url
=None,
220 # decide which episodes to merge
221 groups
= [(0, [self
.episode1
, self
.episode2
])]
224 episode2_id
= self
.episode2
.id
226 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
229 history
= EpisodeHistoryEntry
.objects
.filter(
230 episode
= self
.episode1
,
234 self
.assertIn(action1
, history
)
235 self
.assertIn(action2
, history
)
237 episode1
= Episode
.objects
.get(pk
=self
.episode1
.pk
)
239 # episode2 has been merged into episode1, so it must contain its
241 self
.assertEqual([x
.uuid
for x
in episode1
.merged_uuids
.all()],
245 self
.episode1
.delete()
246 self
.podcast2
.delete()
250 def load_tests(loader
, tests
, ignore
):
251 tests
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
252 tests
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))