2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
19 from datetime
import datetime
21 from collections
import Counter
23 from django
.test
import TestCase
, TransactionTestCase
24 from django
.test
.utils
import override_settings
25 from django
.contrib
.auth
import get_user_model
27 from mygpo
.podcasts
.models
import Podcast
, Episode
28 from mygpo
.users
.models
import EpisodeAction
29 from mygpo
.maintenance
.merge
import PodcastMerger
30 from mygpo
.utils
import get_timestamp
31 from mygpo
.db
.couchdb
.episode_state
import episode_state_for_user_episode
, \
39 class SimpleMergeTests(TestCase
):
42 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
43 'http://example.com/simple-merge-test-feed.rss',
44 defaults
={'title': 'Podcast 1'},
46 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
47 'http://simple-merge-test.org/podcast/',
48 defaults
={'title': 'Podcast 2'},
51 self
.episode1
= Episode
.objects
.get_or_create_for_url(
52 self
.podcast1
, 'http://example.com/simple-merge-test-episode1.mp3',
54 'title': 'Episode 1 A',
56 self
.episode2
= Episode
.objects
.get_or_create_for_url(
57 self
.podcast2
, 'http://example.com/simple-merge-test-episode1.mp3',
59 'title': 'Episode 1 B',
62 def test_merge_podcasts(self
):
63 merge(self
.podcast1
, self
.podcast2
)
66 @override_settings(CACHE
={})
67 class MergeTests(TransactionTestCase
):
68 """ Tests merging of two podcasts, their episodes and states """
71 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
72 'http://example.com/merge-test-feed.rss',
73 defaults
={'title': 'Podcast 1'},
75 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
76 'http://merge-test.org/podcast/',
77 defaults
={'title': 'Podcast 2'},
80 self
.episode1
= Episode
.objects
.get_or_create_for_url(
81 self
.podcast1
, 'http://example.com/merge-test-episode1.mp3',
83 'title': 'Episode 1 A',
85 self
.episode2
= Episode
.objects
.get_or_create_for_url(
86 self
.podcast2
, 'http://example.com/merge-test-episode1.mp3',
88 'title': 'Episode 1 B',
91 User
= get_user_model()
92 self
.user
= User(username
='test-merge')
93 self
.user
.email
= 'test-merge-tests@example.com'
94 self
.user
.set_password('secret!')
97 def test_merge_podcasts(self
):
99 # Create additional data that will be merged
100 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
101 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
103 action1
= EpisodeAction(action
='play',
104 timestamp
=datetime
.utcnow(),
105 upload_timestamp
=get_timestamp(
107 action2
= EpisodeAction(action
='download',
108 timestamp
=datetime
.utcnow(),
109 upload_timestamp
=get_timestamp(
112 add_episode_actions(state1
, [action1
])
113 add_episode_actions(state2
, [action2
])
115 # decide which episodes to merge
116 groups
= [(0, [self
.episode1
, self
.episode2
])]
119 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
122 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
124 # both actions must be present in state1
125 self
.assertIn(action1
, state1
.actions
)
126 self
.assertIn(action2
, state1
.actions
)
129 self
.episode1
.delete()
130 self
.podcast1
.delete()
134 class MergeGroupTests(TransactionTestCase
):
135 """ Tests merging of two podcasts, one of which is part of a group """
138 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
139 'http://example.com/group-merge-feed.rss',
141 'title': 'Podcast 1',
144 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
145 'http://test.org/group-merge-podcast/',
147 'title': 'Podcast 2',
150 self
.podcast3
= Podcast
.objects
.get_or_create_for_url(
151 'http://group-test.org/feed/',
153 'title': 'Podcast 3',
157 self
.episode1
= Episode
.objects
.get_or_create_for_url(
158 self
.podcast1
, 'http://example.com/group-merge-episode1.mp3',
160 'title': 'Episode 1 A',
163 self
.episode2
= Episode
.objects
.get_or_create_for_url(
164 self
.podcast2
, 'http://example.com/group-merge-episode1.mp3',
166 'title': 'Episode 1 B',
169 self
.episode3
= Episode
.objects
.get_or_create_for_url(
170 self
.podcast3
, 'http://example.com/group-merge-media.mp3',
172 'title': 'Episode 2',
176 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
178 User
= get_user_model()
179 self
.user
= User(username
='test-merge-group')
180 self
.user
.email
= 'test-merge-group-tests@example.com'
181 self
.user
.set_password('secret!')
184 def test_merge_podcasts(self
):
185 podcast1
= Podcast
.objects
.get(pk
=self
.podcast1
.pk
)
186 podcast2
= Podcast
.objects
.get(pk
=self
.podcast2
.pk
)
187 podcast3
= Podcast
.objects
.get(pk
=self
.podcast3
.pk
)
189 # assert that the podcasts are actually grouped
190 self
.assertEqual(podcast2
.group
, podcast3
.group
)
192 # Create additional data that will be merged
193 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
194 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
196 action1
= EpisodeAction(action
='play',
197 timestamp
=datetime
.utcnow(),
198 upload_timestamp
=get_timestamp(
200 action2
= EpisodeAction(action
='download',
201 timestamp
=datetime
.utcnow(),
202 upload_timestamp
=get_timestamp(
205 add_episode_actions(state1
, [action1
])
206 add_episode_actions(state2
, [action2
])
208 # decide which episodes to merge
209 groups
= [(0, [self
.episode1
, self
.episode2
])]
212 episode2_id
= self
.episode2
.id
214 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
217 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
219 self
.assertIn(action1
, state1
.actions
)
220 self
.assertIn(action2
, state1
.actions
)
222 episode1
= Episode
.objects
.get(pk
=self
.episode1
.pk
)
224 # episode2 has been merged into episode1, so it must contain its
226 self
.assertEqual([x
.uuid
for x
in episode1
.merged_uuids
.all()],
230 self
.episode1
.delete()
231 self
.podcast2
.delete()
236 suite
= unittest
.TestSuite()
237 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
238 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))