remove gevent monkey-patch from feed-downloader
[mygpo.git] / mygpo / admin / tests.py
blob6b5cd184644e92e5765bb1b946a0a045729f53f0
1 """
2 This file demonstrates writing tests using the unittest module. These will pass
3 when you run "manage.py test".
5 Replace this with more appropriate tests for your application.
6 """
8 import time
9 from datetime import datetime
10 from collections import Counter
12 from django.test import TestCase
14 from mygpo.users.models import User, Device, EpisodeAction
15 from mygpo.core.models import Podcast, Episode
16 from mygpo.maintenance.merge import PodcastMerger
17 from mygpo.db.couchdb.episode import episode_by_id, episodes_for_podcast
18 from mygpo.db.couchdb.podcast import podcast_by_id
19 from mygpo.db.couchdb.podcast_state import podcast_state_for_user_podcast
20 from mygpo.db.couchdb.episode_state import episode_state_for_user_episode
21 from mygpo.utils import get_timestamp
24 class SimpleTest(TestCase):
26 def test_merge(self):
28 p1 = Podcast()
29 p1.urls = ['http://example.com/podcast1.rss']
30 p1.save()
32 p2 = Podcast()
33 p2.urls = ['http://example.com/podcast2.rss']
34 p2.save()
37 e1 = Episode()
38 e1.title = 'Episode 1'
39 e1.podcast = p1.get_id()
40 e1.urls = ['http://example.com/podcast1/e1.mp3']
41 e1.save()
43 e2 = Episode()
44 e2.title = 'Episode 2'
45 e2.podcast = p1.get_id()
46 e2.urls = ['http://example.com/podcast1/e2.mp3']
47 e2.save()
49 e3 = Episode()
50 e3.title = 'Episode 3'
51 e3.podcast = p2.get_id()
52 e3.urls = ['http://example.com/podcast2/e2.mp3']
53 e3.save()
55 e4 = Episode()
56 e4.title = 'Episode 4'
57 e4.podcast = p2.get_id()
58 e4.urls = ['http://example.com/podcast2/e3.mp3']
59 e4.save()
61 user = User()
62 user.username = 'user-test_merge'
63 user.email = 'user-test_merge@example.com'
64 user.set_password('secret')
66 device1 = Device()
67 device1.uid = 'dev1'
69 device2 = Device()
70 device2.uid = 'dev2'
72 user.devices.append(device1)
73 user.devices.append(device2)
74 user.save()
77 p1.subscribe(user, device1)
78 time.sleep(1)
79 p1.unsubscribe(user, device1)
80 time.sleep(1)
81 p1.subscribe(user, device1)
82 p2.subscribe(user, device2)
84 s1 = episode_state_for_user_episode(user, e1)
85 s1.add_actions([EpisodeAction(action='play',
86 upload_timestamp=get_timestamp(datetime.utcnow()))])
87 s1.save()
89 s3 = episode_state_for_user_episode(user, e3)
90 s3.add_actions([EpisodeAction(action='play',
91 upload_timestamp=get_timestamp(datetime.utcnow()))])
92 s3.save()
94 # we need that for later
95 e3_id = e3._id
97 actions = Counter()
99 # decide which episodes to merge
100 groups = [(0, [e1]), (1, [e2, e3]), (2, [e4])]
102 # carry out the merge
103 pm = PodcastMerger([p1, p2], actions, groups)
104 pm.merge()
106 e1 = episode_by_id(e1._id)
107 es1 = episode_state_for_user_episode(user, e1)
108 self.assertEqual(len(es1.actions), 1)
110 # check if merged episode's id can still be accessed
111 e3 = episode_by_id(e3_id)
112 es3 = episode_state_for_user_episode(user, e3)
113 self.assertEqual(len(es3.actions), 1)
115 p1 = podcast_by_id(p1.get_id())
116 ps1 = podcast_state_for_user_podcast(user, p1)
117 self.assertEqual(len(ps1.get_subscribed_device_ids()), 2)
119 self.assertEqual(len(list(episodes_for_podcast(p1))), 3)