2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
21 from collections
import Counter
23 from django
.test
import TestCase
24 from django
.test
.utils
import override_settings
25 from django
.contrib
.auth
import get_user_model
28 from mygpo
.podcasts
.models
import Podcast
29 from mygpo
.maintenance
.merge
import PodcastMerger
30 from mygpo
.api
.backend
import get_device
31 from mygpo
.users
.models
import Client
, SyncGroup
32 from mygpo
.users
.sync
import get_grouped_devices
33 from mygpo
.db
.couchdb
.podcast_state
import (subscribed_podcast_ids_by_user_id
,
34 subscribe
, unsubscribe
, )
37 class DeviceSyncTests(unittest
.TestCase
):
40 User
= get_user_model()
41 self
.user
= User(username
='test')
42 self
.user
.email
= 'test@invalid.com'
43 self
.user
.set_password('secret!')
48 dev1
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d1')
49 dev2
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d2')
51 group
= get_grouped_devices(self
.user
).next()
52 self
.assertEquals(group
.is_synced
, False)
53 self
.assertIn(dev1
, group
.devices
)
54 self
.assertIn(dev2
, group
.devices
)
57 dev3
= Client
.objects
.create(id=uuid
.uuid1(), user
=self
.user
, uid
='d3')
61 groups
= get_grouped_devices(self
.user
)
64 self
.assertEquals(g2
.is_synced
, False)
65 self
.assertIn(dev2
, g2
.devices
)
68 self
.assertEquals(g1
.is_synced
, True)
69 self
.assertIn(dev1
, g1
.devices
)
70 self
.assertIn(dev3
, g1
.devices
)
72 targets
= dev1
.get_sync_targets()
73 target
= targets
.next()
74 self
.assertEquals(target
, dev2
)
80 @override_settings(CACHE
={})
81 class UnsubscribeMergeTests(TestCase
):
82 """ Test if merged podcasts can be properly unsubscribed
84 TODO: this test fails intermittently """
86 P2_URL
= 'http://test.org/podcast/'
89 self
.podcast1
= Podcast
.objects
.get_or_create_for_url('http://example.com/feed.rss')
90 self
.podcast2
= Podcast
.objects
.get_or_create_for_url(self
.P2_URL
)
92 User
= get_user_model()
93 self
.user
= User(username
='test-merge')
94 self
.user
.email
= 'test@example.com'
95 self
.user
.set_password('secret!')
98 self
.device
= get_device(self
.user
, 'dev', '')
100 def test_merge_podcasts(self
):
101 subscribe(self
.podcast2
, self
.user
, self
.device
)
103 # merge podcast2 into podcast1
104 pm
= PodcastMerger([self
.podcast1
, self
.podcast2
], Counter(), [])
107 # seems that setting delayed_commit = false in the CouchDB config, as
108 # well as a delay here fix the intermittent failures.
109 # TODO: further investiation needed
113 # get podcast for URL of podcast2 and unsubscribe from it
114 p
= Podcast
.objects
.get(urls__url
=self
.P2_URL
)
115 unsubscribe(p
, self
.user
, self
.device
)
117 subscriptions
= subscribed_podcast_ids_by_user_id(self
.user
.profile
.uuid
.hex)
118 self
.assertEqual(0, len(subscriptions
))
121 self
.podcast1
.delete()
126 suite
= unittest
.TestSuite()
127 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(DeviceSyncTests
))
128 suite
.addTest(unittest
.TestLoader().loadTestsFromTestCase(UnsubscribeMergeTests
))