2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
21 from collections
import Counter
23 from django
.test
import TestCase
24 from django
.test
.utils
import override_settings
25 from django
.contrib
.auth
import get_user_model
28 from mygpo
.podcasts
.models
import Podcast
29 from mygpo
.maintenance
.merge
import PodcastMerger
30 from mygpo
.api
.backend
import get_device
31 from mygpo
.users
.models
import Client
, SyncGroup
, UserProxy
32 from mygpo
.db
.couchdb
.podcast_state
import (subscribed_podcast_ids_by_user_id
,
33 subscribe
, unsubscribe
, )
36 class DeviceSyncTests(unittest
.TestCase
):
39 self
.user
= UserProxy(username
='test')
40 self
.user
.email
= 'test@invalid.com'
41 self
.user
.set_password('secret!')
46 dev1
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d1')
47 dev2
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d2')
49 group
= self
.user
.get_grouped_devices().next()
50 self
.assertEquals(group
.is_synced
, False)
51 self
.assertIn(dev1
, group
.devices
)
52 self
.assertIn(dev2
, group
.devices
)
55 dev3
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d3')
59 groups
= self
.user
.get_grouped_devices()
62 self
.assertEquals(g2
.is_synced
, False)
63 self
.assertIn(dev2
, g2
.devices
)
66 self
.assertEquals(g1
.is_synced
, True)
67 self
.assertIn(dev1
, g1
.devices
)
68 self
.assertIn(dev3
, g1
.devices
)
70 targets
= dev1
.get_sync_targets()
71 target
= targets
.next()
72 self
.assertEquals(target
, dev2
)
75 Client
.objects
.filter(user
=self
.user
).delete()
79 @override_settings(CACHE
={})
80 class UnsubscribeMergeTests(TestCase
):
81 """ Test if merged podcasts can be properly unsubscribed
83 TODO: this test fails intermittently """
85 P2_URL
= 'http://test.org/podcast/'
88 self
.podcast1
= Podcast
.objects
.get_or_create_for_url('http://example.com/feed.rss')
89 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(self
.P2_URL
)
91 User
= get_user_model()
92 self
.user
= User(username
='test-merge')
93 self
.user
.email
= 'test@example.com'
94 self
.user
.set_password('secret!')
97 self
.device
= get_device(self
.user
, 'dev', '')
99 def test_merge_podcasts(self
):
100 subscribe(self
.podcast2
, self
.user
, self
.device
)
102 # merge podcast2 into podcast1
103 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], Counter(), [])
106 # seems that setting delayed_commit = false in the CouchDB config, as
107 # well as a delay here fix the intermittent failures.
108 # TODO: further investiation needed
112 # get podcast for URL of podcast2 and unsubscribe from it
113 p
= Podcast
.objects
.get(urls__url
=self
.P2_URL
)
114 unsubscribe(p
, self
.user
, self
.device
)
116 subscriptions
= subscribed_podcast_ids_by_user_id(self
.user
.profile
.uuid
.hex)
117 self
.assertEqual(0, len(subscriptions
))
120 self
.podcast1
.delete()
125 suite
= unittest
.TestSuite()
126 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(DeviceSyncTests
))
127 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(UnsubscribeMergeTests
))