remove backported Counter class
[mygpo.git] / mygpo / maintenance / tests.py
blob5264f7ca2afa1efbb31839472194c8e6c4baa015
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime import datetime
19 import unittest
20 from collections import Counter
22 from django.test import TestCase
24 from mygpo.core.models import Podcast, Episode
25 from mygpo.users.models import EpisodeAction, User
26 from mygpo.maintenance.merge import PodcastMerger
27 from mygpo.utils import get_timestamp
28 from mygpo.db.couchdb.podcast import podcast_by_id
29 from mygpo.db.couchdb.episode import episode_by_id
30 from mygpo.db.couchdb.episode_state import episode_state_for_user_episode
33 class MergeTests(TestCase):
34 """ Tests merging of two podcasts, their episodes and states """
36 def setUp(self):
37 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
38 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
39 self.podcast1.save()
40 self.podcast2.save()
42 self.episode1 = Episode(podcast=self.podcast1.get_id(),
43 urls = ['http://example.com/episode1.mp3'])
44 self.episode2 = Episode(podcast=self.podcast2.get_id(),
45 urls = ['http://example.com/episode1.mp3'])
47 self.episode1.save()
48 self.episode2.save()
50 self.user = User(username='test-merge')
51 self.user.email = 'test@example.com'
52 self.user.set_password('secret!')
53 self.user.save()
56 def test_merge_podcasts(self):
58 # Create additional data that will be merged
59 state1 = episode_state_for_user_episode(self.user, self.episode1)
60 state2 = episode_state_for_user_episode(self.user, self.episode2)
62 action1 = EpisodeAction(action='play',
63 timestamp=datetime.utcnow(),
64 upload_timestamp=get_timestamp(datetime.utcnow()))
65 action2 = EpisodeAction(action='download',
66 timestamp=datetime.utcnow(),
67 upload_timestamp=get_timestamp(datetime.utcnow()))
69 state1.add_actions([action1])
70 state2.add_actions([action2])
72 state1.save()
73 state2.save()
75 # copy of the object
76 episode2 = episode_by_id(self.episode2._id)
78 # decide which episodes to merge
79 groups = [(0, [self.episode1, self.episode2])]
80 counter = Counter()
82 pm = PodcastMerger([self.podcast1, self.podcast2], counter, groups)
83 pm.merge()
85 state1 = episode_state_for_user_episode(self.user, self.episode1)
86 state2 = episode_state_for_user_episode(self.user, episode2)
88 self.assertIn(action1, state1.actions)
89 self.assertIn(action2, state1.actions)
90 self.assertEqual(state2._id, None)
94 def tearDown(self):
95 self.podcast1.delete()
96 self.episode1.delete()
98 #self.podcast2.delete()
99 #self.episode2.delete()
101 self.user.delete()
105 class MergeGroupTests(TestCase):
106 """ Tests merging of two podcasts, one of which is part of a group """
108 def setUp(self):
109 self.podcast1 = Podcast(urls=['http://example.com/feed.rss'])
110 self.podcast2 = Podcast(urls=['http://test.org/podcast/'])
111 self.podcast3 = Podcast(urls=['http://test.org/feed/'])
112 self.podcast1.save()
113 self.podcast2.save()
114 self.podcast3.save()
116 self.episode1 = Episode(podcast=self.podcast1.get_id(),
117 urls = ['http://example.com/episode1.mp3'])
118 self.episode2 = Episode(podcast=self.podcast2.get_id(),
119 urls = ['http://example.com/episode1.mp3'])
120 self.episode3 = Episode(podcast=self.podcast3.get_id(),
121 urls = ['http://example.com/media.mp3'])
124 self.episode1.save()
125 self.episode2.save()
126 self.episode3.save()
128 self.podcast2.group_with(self.podcast3, 'My Group', 'Feed1', 'Feed2')
130 self.user = User(username='test-merge-group')
131 self.user.email = 'test@example.com'
132 self.user.set_password('secret!')
133 self.user.save()
136 def test_merge_podcasts(self):
138 podcast1 = podcast_by_id(self.podcast1.get_id())
139 podcast2 = podcast_by_id(self.podcast2.get_id())
140 podcast3 = podcast_by_id(self.podcast3.get_id())
142 # assert that the podcasts are actually grouped
143 self.assertEqual(podcast2._id, podcast3._id)
144 self.assertNotEqual(podcast2.get_id(), podcast2._id)
145 self.assertNotEqual(podcast3.get_id(), podcast3._id)
147 # Create additional data that will be merged
148 state1 = episode_state_for_user_episode(self.user, self.episode1)
149 state2 = episode_state_for_user_episode(self.user, self.episode2)
151 action1 = EpisodeAction(action='play',
152 timestamp=datetime.utcnow(),
153 upload_timestamp=get_timestamp(datetime.utcnow()))
154 action2 = EpisodeAction(action='download',
155 timestamp=datetime.utcnow(),
156 upload_timestamp=get_timestamp(datetime.utcnow()))
158 state1.add_actions([action1])
159 state2.add_actions([action2])
161 state1.save()
162 state2.save()
164 # copy of the object
165 episode2 = episode_by_id(self.episode2._id)
167 # decide which episodes to merge
168 groups = [(0, [self.episode1, self.episode2])]
169 counter = Counter()
171 pm = PodcastMerger([podcast2, podcast1], counter, groups)
172 pm.merge()
174 state1 = episode_state_for_user_episode(self.user, self.episode1)
175 state2 = episode_state_for_user_episode(self.user, episode2)
177 self.assertIn(action1, state1.actions)
178 self.assertIn(action2, state1.actions)
179 self.assertEqual(state2._id, None)
181 episode1 = episode_by_id(self.episode1._id)
183 # episode2 has been merged into episode1, so it must contain its
184 # merged _id
185 self.assertEqual(episode1.merged_ids, [episode2._id])
189 def tearDown(self):
190 self.podcast2.delete()
191 self.episode1.delete()
193 #self.podcast2.delete()
194 #self.episode2.delete()
196 self.user.delete()
199 def suite():
200 suite = unittest.TestSuite()
201 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeTests))
202 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MergeGroupTests))
203 return suite