2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime
import datetime
21 from django
.test
import TestCase
23 from mygpo
.core
.models
import Podcast
, Episode
24 from mygpo
.users
.models
import EpisodeAction
, User
25 from mygpo
.maintenance
.merge
import PodcastMerger
26 from mygpo
.counter
import Counter
27 from mygpo
.db
.couchdb
.podcast
import podcast_by_id
28 from mygpo
.db
.couchdb
.episode
import episode_by_id
29 from mygpo
.db
.couchdb
.episode_state
import episode_state_for_user_episode
32 class MergeTests(TestCase
):
33 """ Tests merging of two podcasts, their episodes and states """
36 self
.podcast1
= Podcast(urls
=['http://example.com/feed.rss'])
37 self
.podcast2
= Podcast(urls
=['http://test.org/podcast/'])
41 self
.episode1
= Episode(podcast
=self
.podcast1
.get_id(),
42 urls
= ['http://example.com/episode1.mp3'])
43 self
.episode2
= Episode(podcast
=self
.podcast2
.get_id(),
44 urls
= ['http://example.com/episode1.mp3'])
49 self
.user
= User(username
='test')
50 self
.user
.email
= 'test@example.com'
51 self
.user
.set_password('secret!')
55 def test_merge_podcasts(self
):
57 # Create additional data that will be merged
58 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
59 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
61 action1
= EpisodeAction(action
='play', timestamp
=datetime
.utcnow())
62 action2
= EpisodeAction(action
='download', timestamp
=datetime
.utcnow())
64 state1
.add_actions([action1
])
65 state2
.add_actions([action2
])
71 episode2
= episode_by_id(self
.episode2
._id
)
73 # decide which episodes to merge
74 groups
= [(0, [self
.episode1
, self
.episode2
])]
77 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
80 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
81 state2
= episode_state_for_user_episode(self
.user
, episode2
)
83 self
.assertIn(action1
, state1
.actions
)
84 self
.assertIn(action2
, state1
.actions
)
85 self
.assertEqual(state2
._id
, None)
90 self
.podcast1
.delete()
91 self
.episode1
.delete()
93 #self.podcast2.delete()
94 #self.episode2.delete()
100 class MergeGroupTests(TestCase
):
101 """ Tests merging of two podcasts, one of which is part of a group """
104 self
.podcast1
= Podcast(urls
=['http://example.com/feed.rss'])
105 self
.podcast2
= Podcast(urls
=['http://test.org/podcast/'])
106 self
.podcast3
= Podcast(urls
=['http://test.org/feed/'])
111 self
.episode1
= Episode(podcast
=self
.podcast1
.get_id(),
112 urls
= ['http://example.com/episode1.mp3'])
113 self
.episode2
= Episode(podcast
=self
.podcast2
.get_id(),
114 urls
= ['http://example.com/episode1.mp3'])
115 self
.episode3
= Episode(podcast
=self
.podcast3
.get_id(),
116 urls
= ['http://example.com/media.mp3'])
123 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
125 self
.user
= User(username
='test')
126 self
.user
.email
= 'test@example.com'
127 self
.user
.set_password('secret!')
131 def test_merge_podcasts(self
):
133 podcast1
= podcast_by_id(self
.podcast1
.get_id())
134 podcast2
= podcast_by_id(self
.podcast2
.get_id())
135 podcast3
= podcast_by_id(self
.podcast3
.get_id())
137 # assert that the podcasts are actually grouped
138 self
.assertEqual(podcast2
._id
, podcast3
._id
)
139 self
.assertNotEqual(podcast2
.get_id(), podcast2
._id
)
140 self
.assertNotEqual(podcast3
.get_id(), podcast3
._id
)
142 # Create additional data that will be merged
143 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
144 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
146 action1
= EpisodeAction(action
='play', timestamp
=datetime
.utcnow())
147 action2
= EpisodeAction(action
='download', timestamp
=datetime
.utcnow())
149 state1
.add_actions([action1
])
150 state2
.add_actions([action2
])
156 episode2
= episode_by_id(self
.episode2
._id
)
158 # decide which episodes to merge
159 groups
= [(0, [self
.episode1
, self
.episode2
])]
162 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
165 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
166 state2
= episode_state_for_user_episode(self
.user
, episode2
)
168 self
.assertIn(action1
, state1
.actions
)
169 self
.assertIn(action2
, state1
.actions
)
170 self
.assertEqual(state2
._id
, None)
172 episode1
= episode_by_id(self
.episode1
._id
)
174 # episode2 has been merged into episode1, so it must contain its
176 self
.assertEqual(episode1
.merged_ids
, [episode2
._id
])
181 self
.podcast2
.delete()
182 self
.episode1
.delete()
184 #self.podcast2.delete()
185 #self.episode2.delete()
191 suite
= unittest
.TestSuite()
192 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
193 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))