extend tests for podcast and episode merging
[mygpo.git] / mygpo / maintenance / tests.py
blob2377a240a0b61df5172ffacc258750feeccf6e72
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime import datetime
19 import unittest
21 from django.test import TestCase
23 from mygpo.core.models import Podcast, Episode
24 from mygpo.users.models import EpisodeAction, User
25 from mygpo.maintenance.merge import PodcastMerger
26 from mygpo.counter import Counter
27 from mygpo.db.couchdb.podcast import podcast_by_id
28 from mygpo.db.couchdb.episode import episode_by_id
29 from mygpo.db.couchdb.episode_state import episode_state_for_user_episode
32 class MergeTests(TestCase):
33 """ Tests merging of two podcasts, their episodes and states """
35 def setUp(self):
36 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
37 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
38 self.podcast1.save()
39 self.podcast2.save()
41 self.episode1 = Episode(podcast=self.podcast1.get_id(),
42 urls = ['http://example.com/episode1.mp3'])
43 self.episode2 = Episode(podcast=self.podcast2.get_id(),
44 urls = ['http://example.com/episode1.mp3'])
46 self.episode1.save()
47 self.episode2.save()
49 self.user = User(username='test')
50 self.user.email = 'test@example.com'
51 self.user.set_password('secret!')
52 self.user.save()
55 def test_merge_podcasts(self):
57 # Create additional data that will be merged
58 state1 = episode_state_for_user_episode(self.user, self.episode1)
59 state2 = episode_state_for_user_episode(self.user, self.episode2)
61 action1 = EpisodeAction(action='play', timestamp=datetime.utcnow())
62 action2 = EpisodeAction(action='download', timestamp=datetime.utcnow())
64 state1.add_actions([action1])
65 state2.add_actions([action2])
67 state1.save()
68 state2.save()
70 # copy of the object
71 episode2 = episode_by_id(self.episode2._id)
73 # decide which episodes to merge
74 groups = [(0, [self.episode1, self.episode2])]
75 counter = Counter()
77 pm = PodcastMerger([self.podcast1, self.podcast2], counter, groups)
78 pm.merge()
80 state1 = episode_state_for_user_episode(self.user, self.episode1)
81 state2 = episode_state_for_user_episode(self.user, episode2)
83 self.assertIn(action1, state1.actions)
84 self.assertIn(action2, state1.actions)
85 self.assertEqual(state2._id, None)
89 def tearDown(self):
90 self.podcast1.delete()
91 self.episode1.delete()
93 #self.podcast2.delete()
94 #self.episode2.delete()
96 self.user.delete()
100 class MergeGroupTests(TestCase):
101 """ Tests merging of two podcasts, one of which is part of a group """
103 def setUp(self):
104 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
105 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
106 self.podcast3 = Podcast(urls=['http://test.org/feed/'])
107 self.podcast1.save()
108 self.podcast2.save()
109 self.podcast3.save()
111 self.episode1 = Episode(podcast=self.podcast1.get_id(),
112 urls = ['http://example.com/episode1.mp3'])
113 self.episode2 = Episode(podcast=self.podcast2.get_id(),
114 urls = ['http://example.com/episode1.mp3'])
115 self.episode3 = Episode(podcast=self.podcast3.get_id(),
116 urls = ['http://example.com/media.mp3'])
119 self.episode1.save()
120 self.episode2.save()
121 self.episode3.save()
123 self.podcast2.group_with(self.podcast3, 'My Group', 'Feed1', 'Feed2')
125 self.user = User(username='test')
126 self.user.email = 'test@example.com'
127 self.user.set_password('secret!')
128 self.user.save()
131 def test_merge_podcasts(self):
133 podcast1 = podcast_by_id(self.podcast1.get_id())
134 podcast2 = podcast_by_id(self.podcast2.get_id())
135 podcast3 = podcast_by_id(self.podcast3.get_id())
137 # assert that the podcasts are actually grouped
138 self.assertEqual(podcast2._id, podcast3._id)
139 self.assertNotEqual(podcast2.get_id(), podcast2._id)
140 self.assertNotEqual(podcast3.get_id(), podcast3._id)
142 # Create additional data that will be merged
143 state1 = episode_state_for_user_episode(self.user, self.episode1)
144 state2 = episode_state_for_user_episode(self.user, self.episode2)
146 action1 = EpisodeAction(action='play', timestamp=datetime.utcnow())
147 action2 = EpisodeAction(action='download', timestamp=datetime.utcnow())
149 state1.add_actions([action1])
150 state2.add_actions([action2])
152 state1.save()
153 state2.save()
155 # copy of the object
156 episode2 = episode_by_id(self.episode2._id)
158 # decide which episodes to merge
159 groups = [(0, [self.episode1, self.episode2])]
160 counter = Counter()
162 pm = PodcastMerger([podcast2, podcast1], counter, groups)
163 pm.merge()
165 state1 = episode_state_for_user_episode(self.user, self.episode1)
166 state2 = episode_state_for_user_episode(self.user, episode2)
168 self.assertIn(action1, state1.actions)
169 self.assertIn(action2, state1.actions)
170 self.assertEqual(state2._id, None)
172 episode1 = episode_by_id(self.episode1._id)
174 # episode2 has been merged into episode1, so it must contain its
175 # merged _id
176 self.assertEqual(episode1.merged_ids, [episode2._id])
180 def tearDown(self):
181 self.podcast2.delete()
182 self.episode1.delete()
184 #self.podcast2.delete()
185 #self.episode2.delete()
187 self.user.delete()
190 def suite():
191 suite = unittest.TestSuite()
192 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeTests))
193 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeGroupTests))
194 return suite