refactor / optimize device updates
authorStefan Kögl <stefan@skoegl.net>
Tue, 23 Jul 2013 19:35:42 +0000 (23 21:35 +0200)
committerStefan Kögl <stefan@skoegl.net>
Tue, 23 Jul 2013 19:35:42 +0000 (23 21:35 +0200)
mygpo/api/advanced/__init__.py
mygpo/api/advanced/updates.py [new file with mode: 0644]
mygpo/api/urls.py
mygpo/users/models.py
mygpo/users/subscriptions.py

index c62d764..0265a00 100644 (file)
 #
 
 from functools import partial
-from itertools import imap, chain
+from itertools import imap
 from collections import defaultdict, namedtuple
 from datetime import datetime
 from importlib import import_module
 
 import dateutil.parser
 
-try:
-    import gevent
-except ImportError:
-    gevent = None
-
 from django.http import HttpResponse, HttpResponseBadRequest, Http404, HttpResponseNotFound
 from django.contrib.sites.models import RequestSite
 from django.views.decorators.csrf import csrf_exempt
 from django.views.decorators.cache import never_cache
-from django.utils.decorators import method_decorator
-from django.views.generic.base import View
 from django.conf import settings as dsettings
 
 from mygpo.api.constants import EPISODE_ACTION_TYPES, DEVICE_TYPES
 from mygpo.api.httpresponse import JsonResponse
-from mygpo.api.advanced.directory import episode_data, podcast_data
+from mygpo.api.advanced.directory import episode_data
 from mygpo.api.backend import get_device, BulkSubscribe
-from mygpo.utils import parse_time, format_time, parse_bool, get_timestamp, \
+from mygpo.utils import format_time, parse_bool, get_timestamp, \
     parse_request_body, normalize_feed_url
-from mygpo.decorators import allowed_methods, repeat_on_conflict
-from mygpo.core import models
+from mygpo.decorators import allowed_methods
 from mygpo.core.tasks import auto_flattr_episode
-from mygpo.users.models import PodcastUserState, EpisodeAction, \
-     EpisodeUserState, DeviceDoesNotExist, DeviceUIDException, \
+from mygpo.users.models import EpisodeAction, \
+     DeviceDoesNotExist, DeviceUIDException, \
      InvalidEpisodeActionAttributes
 from mygpo.users.settings import FLATTR_AUTO
 from mygpo.core.json import JSONDecodeError
 from mygpo.api.basic_auth import require_valid_user, check_username
 from mygpo.db.couchdb import BulkException, bulk_save_retry
-from mygpo.db.couchdb.episode import episode_by_id, \
-         favorite_episodes_for_user, episodes_for_podcast
+from mygpo.db.couchdb.episode import favorite_episodes_for_user
 from mygpo.db.couchdb.podcast import podcast_for_url
 from mygpo.db.couchdb.podcast_state import subscribed_podcast_ids_by_device
-from mygpo.db.couchdb.episode_state import get_podcasts_episode_states, \
-         episode_state_for_ref_urls, get_episode_actions
+from mygpo.db.couchdb.episode_state import episode_state_for_ref_urls, \
+    get_episode_actions
 
 
 import logging
@@ -505,129 +496,6 @@ def device_data(device):
     )
 
 
-
-def get_podcast_data(podcasts, domain, url):
-    """ Gets podcast data for a URL from a dict of podcasts """
-    podcast = podcasts.get(url)
-    return podcast_data(podcast, domain)
-
-
-def get_episode_data(podcasts, domain, clean_action_data, include_actions, episode_status):
-    """ Get episode data for an episode status object """
-    podcast_id = episode_status.episode.podcast
-    podcast = podcasts.get(podcast_id, None)
-    t = episode_data(episode_status.episode, domain, podcast)
-    t['status'] = episode_status.status
-
-    # include latest action (bug 1419)
-    if include_actions and episode_status.action:
-        t['action'] = clean_action_data(episode_status.action)
-
-    return t
-
-
-
-class DeviceUpdates(View):
-
-    @method_decorator(csrf_exempt)
-    @method_decorator(require_valid_user)
-    @method_decorator(check_username)
-    @method_decorator(never_cache)
-    def get(self, request, username, device_uid):
-        now = datetime.now()
-        now_ = get_timestamp(now)
-
-        try:
-            device = request.user.get_device_by_uid(device_uid)
-        except DeviceDoesNotExist as e:
-            return HttpResponseNotFound(str(e))
-
-        since_ = request.GET.get('since', None)
-        if since_ is None:
-            return HttpResponseBadRequest('parameter since missing')
-        try:
-            since = datetime.fromtimestamp(float(since_))
-        except ValueError:
-            return HttpResponseBadRequest("'since' is not a valid timestamp")
-
-        include_actions = parse_bool(request.GET.get('include_actions', False))
-
-        ret = get_subscription_changes(request.user, device, since, now)
-        domain = RequestSite(request).domain
-
-        subscriptions = list(device.get_subscribed_podcasts())
-
-        podcasts = dict( (p.url, p) for p in subscriptions )
-        prepare_podcast_data = partial(get_podcast_data, podcasts, domain)
-
-        ret['add'] = map(prepare_podcast_data, ret['add'])
-
-        devices = dict( (dev.id, dev.uid) for dev in request.user.devices )
-        clean_action_data = partial(clean_episode_action_data,
-                user=request.user, devices=devices)
-
-        # index subscribed podcasts by their Id for fast access
-        podcasts = dict( (p.get_id(), p) for p in subscriptions )
-        prepare_episode_data = partial(get_episode_data, podcasts, domain,
-                clean_action_data, include_actions)
-
-        episode_updates = self.get_episode_updates(request.user,
-                subscriptions, since)
-        ret['updates'] = map(prepare_episode_data, episode_updates)
-
-        return JsonResponse(ret)
-
-
-    def get_episode_updates(self, user, subscribed_podcasts, since,
-            max_per_podcast=5):
-        """ Returns the episode updates since the timestamp """
-
-        EpisodeStatus = namedtuple('EpisodeStatus', 'episode status action')
-
-        episode_status = {}
-
-        # get episodes
-        if gevent:
-            episode_jobs = [gevent.spawn(episodes_for_podcast, p, since,
-                    limit=max_per_podcast) for p in subscribed_podcasts]
-            gevent.joinall(episode_jobs)
-            episodes = chain.from_iterable(job.get() for job in episode_jobs)
-
-        else:
-            episodes = chain.from_iterable(episodes_for_podcast(p, since,
-                    limit=max_per_podcast) for p in subscribed_podcasts)
-
-
-        for episode in episodes:
-            episode_status[episode._id] = EpisodeStatus(episode, 'new', None)
-
-
-        # get episode states
-        if gevent:
-            e_action_jobs = [gevent.spawn(get_podcasts_episode_states, p,
-                    user._id) for p in subscribed_podcasts]
-            gevent.joinall(e_action_jobs)
-            e_actions = chain.from_iterable(job.get() for job in e_action_jobs)
-
-        else:
-            e_actions = chain.from_iterable(get_podcasts_episode_states(p,
-                    user._id) for p in subscribed_podcasts)
-
-
-        for action in e_actions:
-            e_id = action['episode_id']
-
-            if e_id in episode_status:
-                episode = episode_status[e_id].episode
-            else:
-                episode = episode_by_id(e_id)
-
-            episode_status[e_id] = EpisodeStatus(episode, action['action'],
-                    action)
-
-        return episode_status.itervalues()
-
-
 @require_valid_user
 @check_username
 @never_cache
diff --git a/mygpo/api/advanced/updates.py b/mygpo/api/advanced/updates.py
new file mode 100644 (file)
index 0000000..adaa5ac
--- /dev/null
@@ -0,0 +1,189 @@
+#
+# This file is part of my.gpodder.org.
+#
+# my.gpodder.org is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# my.gpodder.org is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
+# License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from itertools import chain
+from datetime import datetime
+
+try:
+    import gevent
+except ImportError:
+    gevent = None
+
+from django.http import HttpResponseBadRequest, HttpResponseNotFound
+from django.contrib.sites.models import RequestSite
+from django.views.decorators.csrf import csrf_exempt
+from django.views.decorators.cache import never_cache
+from django.utils.decorators import method_decorator
+from django.views.generic.base import View
+
+from mygpo.api.httpresponse import JsonResponse
+from mygpo.api.advanced import clean_episode_action_data
+from mygpo.api.advanced.directory import episode_data, podcast_data
+from mygpo.utils import parse_bool, get_timestamp
+from mygpo.users.models import DeviceDoesNotExist
+from mygpo.users.subscriptions import subscription_changes, podcasts_for_states
+from mygpo.api.basic_auth import require_valid_user, check_username
+from mygpo.db.couchdb.episode import episodes_for_podcast
+from mygpo.db.couchdb.episode_state import get_podcasts_episode_states
+from mygpo.db.couchdb.podcast_state import podcast_states_for_device
+
+from collections import namedtuple
+EpisodeStatus = namedtuple('EpisodeStatus', 'episode status action')
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class DeviceUpdates(View):
+    """ returns various updates for a device
+
+    http://wiki.gpodder.org/wiki/Web_Services/API_2/Devices#Get_Updates """
+
+    @method_decorator(csrf_exempt)
+    @method_decorator(require_valid_user)
+    @method_decorator(check_username)
+    @method_decorator(never_cache)
+    def get(self, request, username, device_uid):
+
+        now = datetime.now()
+        now_ = get_timestamp(now)
+
+        user = request.user
+
+        try:
+            device = user.get_device_by_uid(device_uid)
+        except DeviceDoesNotExist as e:
+            return HttpResponseNotFound(str(e))
+
+        try:
+            since = self.get_since(request)
+        except ValueError as e:
+            return HttpResponseBadRequest(str(e))
+
+        include_actions = parse_bool(request.GET.get('include_actions', False))
+
+        domain = RequestSite(request).domain
+
+        add, rem, subscriptions = self.get_subscription_changes(device, since,
+                                                                now, domain)
+        updates = self.get_episode_changes(user, subscriptions, domain,
+                                           include_actions, since)
+
+        return JsonResponse({
+            'add': add,
+            'rem': rem,
+            'updates': updates,
+            'timestamp': get_timestamp(now),
+        })
+
+
+    def get_subscription_changes(self, device, since, now, domain):
+        """ gets new, removed and current subscriptions """
+
+        # DB: get all podcast states for the device
+        podcast_states = podcast_states_for_device(device.id)
+
+        add, rem = subscription_changes(device.id, podcast_states, since, now)
+
+        subscriptions = filter(lambda s: s.is_subscribed_on(device), podcast_states)
+        # DB get podcast objects for the subscribed podcasts
+        subscriptions = podcasts_for_states(subscriptions)
+
+        podcasts = dict( (p.url, p) for p in subscriptions )
+        add = [podcast_data(podcasts.get(url), domain) for url in add ]
+
+        return add, rem, subscriptions
+
+
+    def get_episode_changes(self, user, subscriptions, domain, include_actions, since):
+        devices = dict( (dev.id, dev.uid) for dev in user.devices )
+
+        # index subscribed podcasts by their Id for fast access
+        podcasts = dict( (p.get_id(), p) for p in subscriptions )
+
+        episode_updates = self.get_episode_updates(user, subscriptions, since)
+
+        return [self.get_episode_data(status, podcasts, domain,
+                include_actions, user, devices) for status in episode_updates]
+
+
+    def get_episode_updates(self, user, subscribed_podcasts, since,
+            max_per_podcast=5):
+        """ Returns the episode updates since the timestamp """
+
+        if gevent:
+            # DB: get max_per_podcast episodes for each subscribed podcast
+            episode_jobs = [gevent.spawn(episodes_for_podcast, p, since,
+                    limit=max_per_podcast) for p in subscribed_podcasts]
+            gevent.joinall(episode_jobs)
+            episodes = chain.from_iterable(job.get() for job in episode_jobs)
+
+            # DB: get all episode states for all subscribed podcasts
+            e_action_jobs = [gevent.spawn(get_podcasts_episode_states, p,
+                    user._id) for p in subscribed_podcasts]
+            gevent.joinall(e_action_jobs)
+            e_actions = chain.from_iterable(job.get() for job in e_action_jobs)
+
+        else:
+            episodes = chain.from_iterable(episodes_for_podcast(p, since,
+                    limit=max_per_podcast) for p in subscribed_podcasts)
+
+            e_actions = chain.from_iterable(get_podcasts_episode_states(p,
+                    user._id) for p in subscribed_podcasts)
+
+        # TODO: get_podcasts_episode_states could be optimized by returning
+        # only actions within some time frame
+
+        e_status = { e._id: EpisodeStatus(e, 'new', None) for e in episodes}
+
+        for action in e_actions:
+            e_id = action['episode_id']
+
+            if not e_id in e_status:
+                continue
+
+            episode = e_status[e_id].episode
+
+            e_status[e_id] = EpisodeStatus(episode, action['action'], action)
+
+        return e_status.itervalues()
+
+
+    def get_episode_data(self, episode_status, podcasts, domain, include_actions, user, devices):
+        """ Get episode data for an episode status object """
+
+        # TODO: shouldn't the podcast_id be in the episode status?
+        podcast_id = episode_status.episode.podcast
+        podcast = podcasts.get(podcast_id, None)
+        t = episode_data(episode_status.episode, domain, podcast)
+        t['status'] = episode_status.status
+
+        # include latest action (bug 1419)
+        if include_actions and episode_status.action:
+            t['action'] = clean_episode_action_data(episode_status.action, user, devices)
+
+        return t
+
+    def get_since(self, request):
+        """ parses the "since" parameter """
+        since_ = request.GET.get('since', None)
+        if since_ is None:
+            raise ValueError('parameter since missing')
+        try:
+            return datetime.fromtimestamp(float(since_))
+        except ValueError as e:
+            raise ValueError("'since' is not a valid timestamp: %s" % str(e))
index a8769ba..33a18a6 100644 (file)
@@ -16,7 +16,7 @@ urlpatterns += patterns('mygpo.api.simple',
  url(r'^gpodder-examples\.(?P<format>\w+)$', 'example_podcasts',               name='example-opml'),
 )
 
-from mygpo.api.advanced import DeviceUpdates
+from mygpo.api.advanced.updates import DeviceUpdates
 
 urlpatterns += patterns('mygpo.api.advanced',
     (r'^api/[12]/subscriptions/(?P<username>[\w.-]+)/(?P<device_uid>[\w.-]+)\.json', 'subscriptions'),
index 6900ece..85cb816 100644 (file)
@@ -19,6 +19,7 @@ from mygpo.core.proxy import DocumentABCMeta, proxy_object
 from mygpo.decorators import repeat_on_conflict
 from mygpo.users.ratings import RatingMixin
 from mygpo.users.sync import SyncedDevicesMixin
+from mygpo.users.subscriptions import subscription_changes, podcasts_for_states
 from mygpo.users.settings import FAV_FLAG, PUBLIC_SUB_PODCAST, SettingsMixin
 from mygpo.db.couchdb.podcast import podcasts_by_id, podcasts_to_dict
 from mygpo.db.couchdb.user import user_history, device_history
@@ -352,6 +353,19 @@ class PodcastUserState(Document, SettingsMixin):
         return devices
 
 
+    def is_subscribed_on(self, device):
+        """ checks if the podcast is subscribed on the given device """
+
+        for action in reversed(self.actions):
+            if not action.device == device.id:
+                continue
+
+            # we only need to check the latest action for the device
+            return (action.action == 'subscribe')
+
+        # we haven't found any matching action
+        return False
+
 
     def is_public(self):
         return self.get_wksetting(PUBLIC_SUB_PODCAST)
@@ -388,17 +402,8 @@ class Device(Document, SettingsMixin):
         """
 
         from mygpo.db.couchdb.podcast_state import podcast_states_for_device
-
-        add, rem = [], []
         podcast_states = podcast_states_for_device(self.id)
-        for p_state in podcast_states:
-            change = p_state.get_change_between(self.id, since, until)
-            if change == 'subscribe':
-                add.append( p_state.ref_url )
-            elif change == 'unsubscribe':
-                rem.append( p_state.ref_url )
-
-        return add, rem
+        return subscription_changes(self.id, podcast_states, since, until)
 
 
     def get_latest_changes(self):
@@ -433,14 +438,7 @@ class Device(Document, SettingsMixin):
         the podcast """
 
         states = self.get_subscribed_podcast_states()
-        podcast_ids = [state.podcast for state in states]
-        podcasts = podcasts_to_dict(podcast_ids)
-
-        for state in states:
-            podcast = proxy_object(podcasts[state.podcast], url=state.ref_url)
-            podcasts[state.podcast] = podcast
-
-        return podcasts.values()
+        return podcasts_for_states(states)
 
 
     def __hash__(self):
index 36e5037..e15f5e9 100644 (file)
@@ -1,4 +1,6 @@
+from mygpo.core.proxy import proxy_object
 from mygpo.db.couchdb.user import get_num_listened_episodes
+from mygpo.db.couchdb.podcast import podcasts_to_dict
 
 
 class PodcastSorter(object):
@@ -58,3 +60,30 @@ class PodcastPercentageListenedSorter(PodcastSorter):
                 podcast.episodes_listened = 0
 
         return sorted(self.podcasts, key=SORT_KEY, reverse=True)
+
+
+def subscription_changes(device_id, podcast_states, since, until):
+    """ returns subscription changes for the device and podcast states """
+
+    add, rem = [], []
+    for p_state in podcast_states:
+        change = p_state.get_change_between(device_id, since, until)
+        if change == 'subscribe':
+            add.append( p_state.ref_url )
+        elif change == 'unsubscribe':
+            rem.append( p_state.ref_url )
+
+    return add, rem
+
+
+def podcasts_for_states(podcast_states):
+    """ returns the podcasts corresponding to the podcast states """
+
+    podcast_ids = [state.podcast for state in podcast_states]
+    podcasts = podcasts_to_dict(podcast_ids)
+
+    for state in podcast_states:
+        podcast = proxy_object(podcasts[state.podcast], url=state.ref_url)
+        podcasts[state.podcast] = podcast
+
+    return podcasts.values()