2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from datetime
import datetime
20 from collections
import Counter
22 from django
.test
import TestCase
24 from mygpo
.core
.models
import Podcast
, Episode
25 from mygpo
.users
.models
import EpisodeAction
, User
26 from mygpo
.maintenance
.merge
import PodcastMerger
27 from mygpo
.utils
import get_timestamp
28 from mygpo
.db
.couchdb
.podcast
import podcast_by_id
29 from mygpo
.db
.couchdb
.episode
import episode_by_id
30 from mygpo
.db
.couchdb
.episode_state
import episode_state_for_user_episode
33 class MergeTests(TestCase
):
34 """ Tests merging of two podcasts, their episodes and states """
37 self
.podcast1
= Podcast(urls
=['http://example.com/feed.rss'])
38 self
.podcast2
= Podcast(urls
=['http://test.org/podcast/'])
42 self
.episode1
= Episode(podcast
=self
.podcast1
.get_id(),
43 urls
= ['http://example.com/episode1.mp3'])
44 self
.episode2
= Episode(podcast
=self
.podcast2
.get_id(),
45 urls
= ['http://example.com/episode1.mp3'])
50 self
.user
= User(username
='test-merge')
51 self
.user
.email
= 'test@example.com'
52 self
.user
.set_password('secret!')
56 def test_merge_podcasts(self
):
58 # Create additional data that will be merged
59 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
60 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
62 action1
= EpisodeAction(action
='play',
63 timestamp
=datetime
.utcnow(),
64 upload_timestamp
=get_timestamp(datetime
.utcnow()))
65 action2
= EpisodeAction(action
='download',
66 timestamp
=datetime
.utcnow(),
67 upload_timestamp
=get_timestamp(datetime
.utcnow()))
69 state1
.add_actions([action1
])
70 state2
.add_actions([action2
])
76 episode2
= episode_by_id(self
.episode2
._id
)
78 # decide which episodes to merge
79 groups
= [(0, [self
.episode1
, self
.episode2
])]
82 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], counter
, groups
)
85 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
86 state2
= episode_state_for_user_episode(self
.user
, episode2
)
88 self
.assertIn(action1
, state1
.actions
)
89 self
.assertIn(action2
, state1
.actions
)
90 self
.assertEqual(state2
._id
, None)
95 self
.podcast1
.delete()
96 self
.episode1
.delete()
98 #self.podcast2.delete()
99 #self.episode2.delete()
105 class MergeGroupTests(TestCase
):
106 """ Tests merging of two podcasts, one of which is part of a group """
109 self
.podcast1
= Podcast(urls
=['http://example.com/feed.rss'])
110 self
.podcast2
= Podcast(urls
=['http://test.org/podcast/'])
111 self
.podcast3
= Podcast(urls
=['http://test.org/feed/'])
116 self
.episode1
= Episode(podcast
=self
.podcast1
.get_id(),
117 urls
= ['http://example.com/episode1.mp3'])
118 self
.episode2
= Episode(podcast
=self
.podcast2
.get_id(),
119 urls
= ['http://example.com/episode1.mp3'])
120 self
.episode3
= Episode(podcast
=self
.podcast3
.get_id(),
121 urls
= ['http://example.com/media.mp3'])
128 self
.podcast2
.group_with(self
.podcast3
, 'My Group', 'Feed1', 'Feed2')
130 self
.user
= User(username
='test-merge-group')
131 self
.user
.email
= 'test@example.com'
132 self
.user
.set_password('secret!')
136 def test_merge_podcasts(self
):
138 podcast1
= podcast_by_id(self
.podcast1
.get_id())
139 podcast2
= podcast_by_id(self
.podcast2
.get_id())
140 podcast3
= podcast_by_id(self
.podcast3
.get_id())
142 # assert that the podcasts are actually grouped
143 self
.assertEqual(podcast2
._id
, podcast3
._id
)
144 self
.assertNotEqual(podcast2
.get_id(), podcast2
._id
)
145 self
.assertNotEqual(podcast3
.get_id(), podcast3
._id
)
147 # Create additional data that will be merged
148 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
149 state2
= episode_state_for_user_episode(self
.user
, self
.episode2
)
151 action1
= EpisodeAction(action
='play',
152 timestamp
=datetime
.utcnow(),
153 upload_timestamp
=get_timestamp(datetime
.utcnow()))
154 action2
= EpisodeAction(action
='download',
155 timestamp
=datetime
.utcnow(),
156 upload_timestamp
=get_timestamp(datetime
.utcnow()))
158 state1
.add_actions([action1
])
159 state2
.add_actions([action2
])
165 episode2
= episode_by_id(self
.episode2
._id
)
167 # decide which episodes to merge
168 groups
= [(0, [self
.episode1
, self
.episode2
])]
171 pm
= PodcastMerger([podcast2
, podcast1
], counter
, groups
)
174 state1
= episode_state_for_user_episode(self
.user
, self
.episode1
)
175 state2
= episode_state_for_user_episode(self
.user
, episode2
)
177 self
.assertIn(action1
, state1
.actions
)
178 self
.assertIn(action2
, state1
.actions
)
179 self
.assertEqual(state2
._id
, None)
181 episode1
= episode_by_id(self
.episode1
._id
)
183 # episode2 has been merged into episode1, so it must contain its
185 self
.assertEqual(episode1
.merged_ids
, [episode2
._id
])
190 self
.podcast2
.delete()
191 self
.episode1
.delete()
193 #self.podcast2.delete()
194 #self.episode2.delete()
200 suite
= unittest
.TestSuite()
201 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeTests
))
202 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(MergeGroupTests
))