2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime
import datetime
20 from collections
import Counter
22 from django
.test
import TestCase
, TransactionTestCase
23 from django
.test
.utils
import override_settings
25 from mygpo
.podcasts
.models
import Podcast
, Episode
26 from mygpo
.users
.models
import EpisodeAction
, User
27 from mygpo
.maintenance
.merge
import PodcastMerger
28 from mygpo
.utils
import get_timestamp
29 from mygpo
.db
.couchdb
.episode_state
import episode_state_for_user_episode
, \
33 @override_settings(CACHE
={})
34 class MergeTests(TransactionTestCase
):
35 """ Tests merging of two podcasts, their episodes and states """
38 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
39 'http://example.com/merge-test-feed.rss',
40 defaults
={'title': 'Podcast 1'},
42 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
43 'http://merge-test.org/podcast/',
44 defaults
={'title': 'Podcast 2'},
47 self
.episode1
= Episode
.objects
.get_or_create_for_url(
48 self
.podcast1
, 'http://example.com/merge-test-episode1.mp3',
50 'title': 'Episode 1 A',
52 self
.episode2
= Episode
.objects
.get_or_create_for_url(
53 self
.podcast2
, 'http://example.com/merge-test-episode1.mp3',
55 'title': 'Episode 1 B',
58 self
.user
= User(username
='test-merge')
59 self
.user
.email
= 'test-merge-tests@example.com'
60 self
.user
.set_password('secret!')
63 def test_merge_podcasts(self
):
65 # Create additional data that will be merged
66 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
67 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
69 action1
= EpisodeAction(action
='play',
70 timestamp
=datetime
.utcnow(),
71 upload_timestamp
=get_timestamp(
73 action2
= EpisodeAction(action
='download',
74 timestamp
=datetime
.utcnow(),
75 upload_timestamp
=get_timestamp(
78 add_episode_actions(state1
, [action1
])
79 add_episode_actions(state2
, [action2
])
81 # decide which episodes to merge
82 groups
= [(0, [self
.episode1
, self
.episode2
])]
85 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
88 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
90 # both actions must be present in state1
91 self
.assertIn(action1
, state1
.actions
)
92 self
.assertIn(action2
, state1
.actions
)
95 self
.episode1
.delete()
96 self
.podcast1
.delete()
100 class MergeGroupTests(TransactionTestCase
):
101 """ Tests merging of two podcasts, one of which is part of a group """
104 self
.podcast1
= Podcast
.objects
.get_or_create_for_url(
105 'http://example.com/group-merge-feed.rss',
107 'title': 'Podcast 1',
110 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(
111 'http://test.org/group-merge-podcast/',
113 'title': 'Podcast 2',
116 self
.podcast3
= Podcast
.objects
.get_or_create_for_url(
117 'http://group-test.org/feed/',
119 'title': 'Podcast 3',
123 self
.episode1
= Episode
.objects
.get_or_create_for_url(
124 self
.podcast1
, 'http://example.com/group-merge-episode1.mp3',
126 'title': 'Episode 1 A',
129 self
.episode2
= Episode
.objects
.get_or_create_for_url(
130 self
.podcast2
, 'http://example.com/group-merge-episode1.mp3',
132 'title': 'Episode 1 B',
135 self
.episode3
= Episode
.objects
.get_or_create_for_url(
136 self
.podcast3
, 'http://example.com/group-merge-media.mp3',
138 'title': 'Episode 2',
142 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
144 self
.user
= User(username
='test-merge-group')
145 self
.user
.email
= 'test-merge-group-tests@example.com'
146 self
.user
.set_password('secret!')
149 def test_merge_podcasts(self
):
150 podcast1
= Podcast
.objects
.get(pk
=self
.podcast1
.pk
)
151 podcast2
= Podcast
.objects
.get(pk
=self
.podcast2
.pk
)
152 podcast3
= Podcast
.objects
.get(pk
=self
.podcast3
.pk
)
154 # assert that the podcasts are actually grouped
155 self
.assertEqual(podcast2
.group
, podcast3
.group
)
157 # Create additional data that will be merged
158 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
159 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
161 action1
= EpisodeAction(action
='play',
162 timestamp
=datetime
.utcnow(),
163 upload_timestamp
=get_timestamp(
165 action2
= EpisodeAction(action
='download',
166 timestamp
=datetime
.utcnow(),
167 upload_timestamp
=get_timestamp(
170 add_episode_actions(state1
, [action1
])
171 add_episode_actions(state2
, [action2
])
173 # decide which episodes to merge
174 groups
= [(0, [self
.episode1
, self
.episode2
])]
177 episode2_id
= self
.episode2
.id
179 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
182 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
184 self
.assertIn(action1
, state1
.actions
)
185 self
.assertIn(action2
, state1
.actions
)
187 episode1
= Episode
.objects
.get(pk
=self
.episode1
.pk
)
189 # episode2 has been merged into episode1, so it must contain its
191 self
.assertEqual([x
.uuid
for x
in episode1
.merged_uuids
.all()],
195 self
.episode1
.delete()
196 self
.podcast2
.delete()
201 suite
= unittest
.TestSuite()
202 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
203 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))