[Migration] remove CouchDB fulltext index
[mygpo.git] / mygpo / users / tests.py
blob29d38581aece4248a4f3519f8a0475558ccb0f20
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 import unittest
19 import doctest
20 from collections import Counter
22 from django.test import TestCase
23 from django.test.utils import override_settings
25 import mygpo.utils
26 from mygpo.podcasts.models import Podcast
27 from mygpo.maintenance.merge import PodcastMerger
28 from mygpo.api.backend import get_device
29 from mygpo.users.models import User, Device
30 from mygpo.db.couchdb.podcast_state import (subscribed_podcast_ids_by_user_id,
31 subscribe, unsubscribe, )
34 class DeviceSyncTests(unittest.TestCase):
36 def setUp(self):
37 self.user = User(username='test')
38 self.user.email = 'test@invalid.com'
39 self.user.set_password('secret!')
40 self.user.save()
43 def test_group(self):
44 dev1 = Device(uid='d1')
45 self.user.devices.append(dev1)
47 dev2 = Device(uid='d2')
48 self.user.devices.append(dev2)
50 group = self.user.get_grouped_devices().next()
51 self.assertEquals(group.is_synced, False)
52 self.assertIn(dev1, group.devices)
53 self.assertIn(dev2, group.devices)
56 dev3 = Device(uid='d3')
57 self.user.devices.append(dev3)
59 self.user.sync_devices(dev1, dev3)
61 groups = self.user.get_grouped_devices()
62 g1 = groups.next()
64 self.assertEquals(g1.is_synced, True)
65 self.assertIn(dev1, g1.devices)
66 self.assertIn(dev3, g1.devices)
68 g2 = groups.next()
69 self.assertEquals(g2.is_synced, False)
70 self.assertIn(dev2, g2.devices)
73 targets = self.user.get_sync_targets(dev1)
74 target = targets.next()
75 self.assertEquals(target, dev2)
77 def tearDown(self):
78 self.user.delete()
81 @override_settings(CACHE={})
82 class UnsubscribeMergeTests(TestCase):
83 """ Test if merged podcasts can be properly unsubscribed
85 TODO: this test fails intermittently """
87 P2_URL = 'http://test.org/podcast/'
89 def setUp(self):
90 self.podcast1 = Podcast.objects.get_or_create_for_url('http://example.com/feed.rss')
91 self.podcast2 = Podcast.objects.get_or_create_for_url(self.P2_URL)
93 self.user = User(username='test-merge')
94 self.user.email = 'test@example.com'
95 self.user.set_password('secret!')
96 self.user.save()
98 self.device = get_device(self.user, 'dev', '')
100 def test_merge_podcasts(self):
101 subscribe(self.podcast2, self.user, self.device)
103 # merge podcast2 into podcast1
104 pm = PodcastMerger([self.podcast1, self.podcast2], Counter(), [])
105 pm.merge()
107 # seems that setting delayed_commit = false in the CouchDB config, as
108 # well as a delay here fix the intermittent failures.
109 # TODO: further investiation needed
110 import time
111 time.sleep(2)
113 # get podcast for URL of podcast2 and unsubscribe from it
114 p = Podcast.objects.get(urls__url=self.P2_URL)
115 unsubscribe(p, self.user, self.device)
117 subscriptions = subscribed_podcast_ids_by_user_id(self.user._id)
118 self.assertEqual(0, len(subscriptions))
120 def tearDown(self):
121 self.podcast1.delete()
122 self.user.delete()
125 def suite():
126 suite = unittest.TestSuite()
127 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(DeviceSyncTests))
128 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(UnsubscribeMergeTests))
129 return suite