2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
20 from collections
import Counter
22 from django
.core
.urlresolvers
import reverse
23 from django
.test
.client
import Client
as TestClient
24 from django
.test
import TestCase
25 from django
.test
.utils
import override_settings
26 from django
.contrib
.auth
import get_user_model
28 from mygpo
.test
import create_auth_string
, create_user
29 from mygpo
.podcasts
.models
import Podcast
30 from mygpo
.maintenance
.merge
import PodcastMerger
31 from mygpo
.api
.backend
import get_device
32 from mygpo
.users
.models
import Client
, SyncGroup
, UserProxy
33 from mygpo
.subscriptions
import subscribe
, unsubscribe
36 class DeviceSyncTests(unittest
.TestCase
):
39 self
.user
= UserProxy(username
='test')
40 self
.user
.email
= 'test@invalid.com'
41 self
.user
.set_password('secret!')
46 dev1
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d1')
47 dev2
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d2')
49 group
= next(self
.user
.get_grouped_devices())
50 self
.assertEqual(group
.is_synced
, False)
51 self
.assertIn(dev1
, group
.devices
)
52 self
.assertIn(dev2
, group
.devices
)
55 dev3
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d3')
59 groups
= self
.user
.get_grouped_devices()
62 self
.assertEqual(g2
.is_synced
, False)
63 self
.assertIn(dev2
, g2
.devices
)
66 self
.assertEqual(g1
.is_synced
, True)
67 self
.assertIn(dev1
, g1
.devices
)
68 self
.assertIn(dev3
, g1
.devices
)
70 targets
= dev1
.get_sync_targets()
71 target
= next(targets
)
72 self
.assertEqual(target
, dev2
)
75 Client
.objects
.filter(user
=self
.user
).delete()
79 @override_settings(CACHE
={})
80 class UnsubscribeMergeTests(TestCase
):
81 """ Test if merged podcasts can be properly unsubscribed """
83 P2_URL
= 'http://test.org/podcast/'
86 self
.podcast1
= Podcast
.objects
.get_or_create_for_url('http://example.com/feed.rss')
87 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(self
.P2_URL
)
89 User
= get_user_model()
90 self
.user
= User(username
='test-merge')
91 self
.user
.email
= 'test@example.com'
92 self
.user
.set_password('secret!')
95 self
.device
= get_device(self
.user
, 'dev', '')
97 def test_merge_podcasts(self
):
98 subscribe(self
.podcast2
, self
.user
, self
.device
)
100 # merge podcast2 into podcast1
101 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], Counter(), [])
104 # get podcast for URL of podcast2 and unsubscribe from it
105 p
= Podcast
.objects
.get(urls__url
=self
.P2_URL
)
106 unsubscribe(p
, self
.user
, self
.device
)
108 subscriptions
= Podcast
.objects
.filter(subscription__user
=self
.user
)
109 self
.assertEqual(0, len(subscriptions
))
112 self
.podcast1
.delete()
116 class AuthTests(TestCase
):
119 self
.user
, pwd
= create_user()
120 self
.client
= TestClient()
121 wrong_pwd
= pwd
+ '1234'
123 'HTTP_AUTHORIZATION': create_auth_string(self
.user
.username
,
127 def test_queries_failed_auth(self
):
128 """ Verifies the number of queries that are executed on failed auth """
129 url
= reverse('api-all-subscriptions',
130 args
=(self
.user
.username
, 'opml'))
131 with self
.assertNumQueries(1):
132 resp
= self
.client
.get(url
, **self
.extra
)
133 self
.assertEqual(resp
.status_code
, 401, resp
.content
)
136 def load_tests(loader
, tests
, ignore
):
137 for m
in [DeviceSyncTests
, UnsubscribeMergeTests
, AuthTests
]:
138 tests
.addTest(unittest
.TestLoader().loadTestsFromTestCase(m
))