various fixes
[mygpo.git] / mygpo / maintenance / tests.py
blob0511bb6b422fea9cd2b4e02e2866f50181dd910d
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime import datetime
19 import unittest
20 from collections import Counter
22 from django.test import TestCase
23 from django.test.utils import override_settings
25 from mygpo.core.models import Podcast, Episode
26 from mygpo.users.models import EpisodeAction, User
27 from mygpo.maintenance.merge import PodcastMerger
28 from mygpo.utils import get_timestamp
29 from mygpo.db.couchdb.podcast import podcast_by_id
30 from mygpo.db.couchdb.episode import episode_by_id
31 from mygpo.db.couchdb.episode_state import episode_state_for_user_episode, \
32 add_episode_actions
35 @override_settings(CACHE={})
36 class MergeTests(TestCase):
37 """ Tests merging of two podcasts, their episodes and states """
39 def setUp(self):
40 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
41 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
42 self.podcast1.save()
43 self.podcast2.save()
45 self.episode1 = Episode(podcast=self.podcast1.get_id(),
46 urls = ['http://example.com/episode1.mp3'])
47 self.episode2 = Episode(podcast=self.podcast2.get_id(),
48 urls = ['http://example.com/episode1.mp3'])
50 self.episode1.save()
51 self.episode2.save()
53 self.user = User(username='test-merge')
54 self.user.email = 'test@example.com'
55 self.user.set_password('secret!')
56 self.user.save()
59 def test_merge_podcasts(self):
61 # Create additional data that will be merged
62 state1 = episode_state_for_user_episode(self.user, self.episode1)
63 state2 = episode_state_for_user_episode(self.user, self.episode2)
65 action1 = EpisodeAction(action='play',
66 timestamp=datetime.utcnow(),
67 upload_timestamp=get_timestamp(datetime.utcnow()))
68 action2 = EpisodeAction(action='download',
69 timestamp=datetime.utcnow(),
70 upload_timestamp=get_timestamp(datetime.utcnow()))
72 add_episode_actions(state1, [action1])
73 add_episode_actions(state2, [action2])
75 # copy of the object
76 episode2 = episode_by_id(self.episode2._id)
78 # decide which episodes to merge
79 groups = [(0, [self.episode1, self.episode2])]
80 counter = Counter()
82 pm = PodcastMerger([self.podcast1, self.podcast2], counter, groups)
83 pm.merge()
85 state1 = episode_state_for_user_episode(self.user, self.episode1)
86 state2 = episode_state_for_user_episode(self.user, episode2)
88 self.assertIn(action1, state1.actions)
89 self.assertIn(action2, state1.actions)
90 self.assertEqual(state2._id, None)
94 def tearDown(self):
95 self.podcast1.delete()
96 self.episode1.delete()
98 #self.podcast2.delete()
99 #self.episode2.delete()
101 self.user.delete()
105 class MergeGroupTests(TestCase):
106 """ Tests merging of two podcasts, one of which is part of a group """
108 def setUp(self):
109 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
110 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
111 self.podcast3 = Podcast(urls=['http://test.org/feed/'])
112 self.podcast1.save()
113 self.podcast2.save()
114 self.podcast3.save()
116 self.episode1 = Episode(podcast=self.podcast1.get_id(),
117 urls = ['http://example.com/episode1.mp3'])
118 self.episode2 = Episode(podcast=self.podcast2.get_id(),
119 urls = ['http://example.com/episode1.mp3'])
120 self.episode3 = Episode(podcast=self.podcast3.get_id(),
121 urls = ['http://example.com/media.mp3'])
124 self.episode1.save()
125 self.episode2.save()
126 self.episode3.save()
128 self.podcast2.group_with(self.podcast3, 'My Group', 'Feed1', 'Feed2')
130 self.user = User(username='test-merge-group')
131 self.user.email = 'test@example.com'
132 self.user.set_password('secret!')
133 self.user.save()
136 def test_merge_podcasts(self):
138 podcast1 = podcast_by_id(self.podcast1.get_id())
139 podcast2 = podcast_by_id(self.podcast2.get_id())
140 podcast3 = podcast_by_id(self.podcast3.get_id())
142 # assert that the podcasts are actually grouped
143 self.assertEqual(podcast2._id, podcast3._id)
144 self.assertNotEqual(podcast2.get_id(), podcast2._id)
145 self.assertNotEqual(podcast3.get_id(), podcast3._id)
147 # Create additional data that will be merged
148 state1 = episode_state_for_user_episode(self.user, self.episode1)
149 state2 = episode_state_for_user_episode(self.user, self.episode2)
151 action1 = EpisodeAction(action='play',
152 timestamp=datetime.utcnow(),
153 upload_timestamp=get_timestamp(datetime.utcnow()))
154 action2 = EpisodeAction(action='download',
155 timestamp=datetime.utcnow(),
156 upload_timestamp=get_timestamp(datetime.utcnow()))
158 add_episode_actions(state1, [action1])
159 add_episode_actions(state2, [action2])
161 # copy of the object
162 episode2 = episode_by_id(self.episode2._id)
164 # decide which episodes to merge
165 groups = [(0, [self.episode1, self.episode2])]
166 counter = Counter()
168 pm = PodcastMerger([podcast2, podcast1], counter, groups)
169 pm.merge()
171 state1 = episode_state_for_user_episode(self.user, self.episode1)
172 state2 = episode_state_for_user_episode(self.user, episode2)
174 self.assertIn(action1, state1.actions)
175 self.assertIn(action2, state1.actions)
176 self.assertEqual(state2._id, None)
178 episode1 = episode_by_id(self.episode1._id)
180 # episode2 has been merged into episode1, so it must contain its
181 # merged _id
182 self.assertEqual(episode1.merged_ids, [episode2._id])
186 def tearDown(self):
187 self.podcast2.delete()
188 self.episode1.delete()
190 #self.podcast2.delete()
191 #self.episode2.delete()
193 self.user.delete()
196 def suite():
197 suite = unittest.TestSuite()
198 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeTests))
199 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeGroupTests))
200 return suite