From e5d2aa72325d3799d3822be18853c5c10471961a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Stefan=20K=C3=B6gl?= Date: Tue, 23 Jul 2013 21:35:42 +0200 Subject: [PATCH] refactor / optimize device updates --- mygpo/api/advanced/__init__.py | 150 ++------------------------------ mygpo/api/advanced/updates.py | 189 +++++++++++++++++++++++++++++++++++++++++ mygpo/api/urls.py | 2 +- mygpo/users/models.py | 34 ++++---- mygpo/users/subscriptions.py | 29 +++++++ 5 files changed, 244 insertions(+), 160 deletions(-) create mode 100644 mygpo/api/advanced/updates.py diff --git a/mygpo/api/advanced/__init__.py b/mygpo/api/advanced/__init__.py index c62d7640..0265a00d 100644 --- a/mygpo/api/advanced/__init__.py +++ b/mygpo/api/advanced/__init__.py @@ -16,48 +16,39 @@ # from functools import partial -from itertools import imap, chain +from itertools import imap from collections import defaultdict, namedtuple from datetime import datetime from importlib import import_module import dateutil.parser -try: - import gevent -except ImportError: - gevent = None - from django.http import HttpResponse, HttpResponseBadRequest, Http404, HttpResponseNotFound from django.contrib.sites.models import RequestSite from django.views.decorators.csrf import csrf_exempt from django.views.decorators.cache import never_cache -from django.utils.decorators import method_decorator -from django.views.generic.base import View from django.conf import settings as dsettings from mygpo.api.constants import EPISODE_ACTION_TYPES, DEVICE_TYPES from mygpo.api.httpresponse import JsonResponse -from mygpo.api.advanced.directory import episode_data, podcast_data +from mygpo.api.advanced.directory import episode_data from mygpo.api.backend import get_device, BulkSubscribe -from mygpo.utils import parse_time, format_time, parse_bool, get_timestamp, \ +from mygpo.utils import format_time, parse_bool, get_timestamp, \ parse_request_body, normalize_feed_url -from mygpo.decorators import allowed_methods, repeat_on_conflict -from mygpo.core import models +from mygpo.decorators import allowed_methods from mygpo.core.tasks import auto_flattr_episode -from mygpo.users.models import PodcastUserState, EpisodeAction, \ - EpisodeUserState, DeviceDoesNotExist, DeviceUIDException, \ +from mygpo.users.models import EpisodeAction, \ + DeviceDoesNotExist, DeviceUIDException, \ InvalidEpisodeActionAttributes from mygpo.users.settings import FLATTR_AUTO from mygpo.core.json import JSONDecodeError from mygpo.api.basic_auth import require_valid_user, check_username from mygpo.db.couchdb import BulkException, bulk_save_retry -from mygpo.db.couchdb.episode import episode_by_id, \ - favorite_episodes_for_user, episodes_for_podcast +from mygpo.db.couchdb.episode import favorite_episodes_for_user from mygpo.db.couchdb.podcast import podcast_for_url from mygpo.db.couchdb.podcast_state import subscribed_podcast_ids_by_device -from mygpo.db.couchdb.episode_state import get_podcasts_episode_states, \ - episode_state_for_ref_urls, get_episode_actions +from mygpo.db.couchdb.episode_state import episode_state_for_ref_urls, \ + get_episode_actions import logging @@ -505,129 +496,6 @@ def device_data(device): ) - -def get_podcast_data(podcasts, domain, url): - """ Gets podcast data for a URL from a dict of podcasts """ - podcast = podcasts.get(url) - return podcast_data(podcast, domain) - - -def get_episode_data(podcasts, domain, clean_action_data, include_actions, episode_status): - """ Get episode data for an episode status object """ - podcast_id = episode_status.episode.podcast - podcast = podcasts.get(podcast_id, None) - t = episode_data(episode_status.episode, domain, podcast) - t['status'] = episode_status.status - - # include latest action (bug 1419) - if include_actions and episode_status.action: - t['action'] = clean_action_data(episode_status.action) - - return t - - - -class DeviceUpdates(View): - - @method_decorator(csrf_exempt) - @method_decorator(require_valid_user) - @method_decorator(check_username) - @method_decorator(never_cache) - def get(self, request, username, device_uid): - now = datetime.now() - now_ = get_timestamp(now) - - try: - device = request.user.get_device_by_uid(device_uid) - except DeviceDoesNotExist as e: - return HttpResponseNotFound(str(e)) - - since_ = request.GET.get('since', None) - if since_ is None: - return HttpResponseBadRequest('parameter since missing') - try: - since = datetime.fromtimestamp(float(since_)) - except ValueError: - return HttpResponseBadRequest("'since' is not a valid timestamp") - - include_actions = parse_bool(request.GET.get('include_actions', False)) - - ret = get_subscription_changes(request.user, device, since, now) - domain = RequestSite(request).domain - - subscriptions = list(device.get_subscribed_podcasts()) - - podcasts = dict( (p.url, p) for p in subscriptions ) - prepare_podcast_data = partial(get_podcast_data, podcasts, domain) - - ret['add'] = map(prepare_podcast_data, ret['add']) - - devices = dict( (dev.id, dev.uid) for dev in request.user.devices ) - clean_action_data = partial(clean_episode_action_data, - user=request.user, devices=devices) - - # index subscribed podcasts by their Id for fast access - podcasts = dict( (p.get_id(), p) for p in subscriptions ) - prepare_episode_data = partial(get_episode_data, podcasts, domain, - clean_action_data, include_actions) - - episode_updates = self.get_episode_updates(request.user, - subscriptions, since) - ret['updates'] = map(prepare_episode_data, episode_updates) - - return JsonResponse(ret) - - - def get_episode_updates(self, user, subscribed_podcasts, since, - max_per_podcast=5): - """ Returns the episode updates since the timestamp """ - - EpisodeStatus = namedtuple('EpisodeStatus', 'episode status action') - - episode_status = {} - - # get episodes - if gevent: - episode_jobs = [gevent.spawn(episodes_for_podcast, p, since, - limit=max_per_podcast) for p in subscribed_podcasts] - gevent.joinall(episode_jobs) - episodes = chain.from_iterable(job.get() for job in episode_jobs) - - else: - episodes = chain.from_iterable(episodes_for_podcast(p, since, - limit=max_per_podcast) for p in subscribed_podcasts) - - - for episode in episodes: - episode_status[episode._id] = EpisodeStatus(episode, 'new', None) - - - # get episode states - if gevent: - e_action_jobs = [gevent.spawn(get_podcasts_episode_states, p, - user._id) for p in subscribed_podcasts] - gevent.joinall(e_action_jobs) - e_actions = chain.from_iterable(job.get() for job in e_action_jobs) - - else: - e_actions = chain.from_iterable(get_podcasts_episode_states(p, - user._id) for p in subscribed_podcasts) - - - for action in e_actions: - e_id = action['episode_id'] - - if e_id in episode_status: - episode = episode_status[e_id].episode - else: - episode = episode_by_id(e_id) - - episode_status[e_id] = EpisodeStatus(episode, action['action'], - action) - - return episode_status.itervalues() - - @require_valid_user @check_username @never_cache diff --git a/mygpo/api/advanced/updates.py b/mygpo/api/advanced/updates.py new file mode 100644 index 00000000..adaa5ac9 --- /dev/null +++ b/mygpo/api/advanced/updates.py @@ -0,0 +1,189 @@ +# +# This file is part of my.gpodder.org. +# +# my.gpodder.org is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# my.gpodder.org is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public +# License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with my.gpodder.org. If not, see . +# + +from itertools import chain +from datetime import datetime + +try: + import gevent +except ImportError: + gevent = None + +from django.http import HttpResponseBadRequest, HttpResponseNotFound +from django.contrib.sites.models import RequestSite +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.cache import never_cache +from django.utils.decorators import method_decorator +from django.views.generic.base import View + +from mygpo.api.httpresponse import JsonResponse +from mygpo.api.advanced import clean_episode_action_data +from mygpo.api.advanced.directory import episode_data, podcast_data +from mygpo.utils import parse_bool, get_timestamp +from mygpo.users.models import DeviceDoesNotExist +from mygpo.users.subscriptions import subscription_changes, podcasts_for_states +from mygpo.api.basic_auth import require_valid_user, check_username +from mygpo.db.couchdb.episode import episodes_for_podcast +from mygpo.db.couchdb.episode_state import get_podcasts_episode_states +from mygpo.db.couchdb.podcast_state import podcast_states_for_device + +from collections import namedtuple +EpisodeStatus = namedtuple('EpisodeStatus', 'episode status action') + +import logging +logger = logging.getLogger(__name__) + + +class DeviceUpdates(View): + """ returns various updates for a device + + http://wiki.gpodder.org/wiki/Web_Services/API_2/Devices#Get_Updates """ + + @method_decorator(csrf_exempt) + @method_decorator(require_valid_user) + @method_decorator(check_username) + @method_decorator(never_cache) + def get(self, request, username, device_uid): + + now = datetime.now() + now_ = get_timestamp(now) + + user = request.user + + try: + device = user.get_device_by_uid(device_uid) + except DeviceDoesNotExist as e: + return HttpResponseNotFound(str(e)) + + try: + since = self.get_since(request) + except ValueError as e: + return HttpResponseBadRequest(str(e)) + + include_actions = parse_bool(request.GET.get('include_actions', False)) + + domain = RequestSite(request).domain + + add, rem, subscriptions = self.get_subscription_changes(device, since, + now, domain) + updates = self.get_episode_changes(user, subscriptions, domain, + include_actions, since) + + return JsonResponse({ + 'add': add, + 'rem': rem, + 'updates': updates, + 'timestamp': get_timestamp(now), + }) + + + def get_subscription_changes(self, device, since, now, domain): + """ gets new, removed and current subscriptions """ + + # DB: get all podcast states for the device + podcast_states = podcast_states_for_device(device.id) + + add, rem = subscription_changes(device.id, podcast_states, since, now) + + subscriptions = filter(lambda s: s.is_subscribed_on(device), podcast_states) + # DB get podcast objects for the subscribed podcasts + subscriptions = podcasts_for_states(subscriptions) + + podcasts = dict( (p.url, p) for p in subscriptions ) + add = [podcast_data(podcasts.get(url), domain) for url in add ] + + return add, rem, subscriptions + + + def get_episode_changes(self, user, subscriptions, domain, include_actions, since): + devices = dict( (dev.id, dev.uid) for dev in user.devices ) + + # index subscribed podcasts by their Id for fast access + podcasts = dict( (p.get_id(), p) for p in subscriptions ) + + episode_updates = self.get_episode_updates(user, subscriptions, since) + + return [self.get_episode_data(status, podcasts, domain, + include_actions, user, devices) for status in episode_updates] + + + def get_episode_updates(self, user, subscribed_podcasts, since, + max_per_podcast=5): + """ Returns the episode updates since the timestamp """ + + if gevent: + # DB: get max_per_podcast episodes for each subscribed podcast + episode_jobs = [gevent.spawn(episodes_for_podcast, p, since, + limit=max_per_podcast) for p in subscribed_podcasts] + gevent.joinall(episode_jobs) + episodes = chain.from_iterable(job.get() for job in episode_jobs) + + # DB: get all episode states for all subscribed podcasts + e_action_jobs = [gevent.spawn(get_podcasts_episode_states, p, + user._id) for p in subscribed_podcasts] + gevent.joinall(e_action_jobs) + e_actions = chain.from_iterable(job.get() for job in e_action_jobs) + + else: + episodes = chain.from_iterable(episodes_for_podcast(p, since, + limit=max_per_podcast) for p in subscribed_podcasts) + + e_actions = chain.from_iterable(get_podcasts_episode_states(p, + user._id) for p in subscribed_podcasts) + + # TODO: get_podcasts_episode_states could be optimized by returning + # only actions within some time frame + + e_status = { e._id: EpisodeStatus(e, 'new', None) for e in episodes} + + for action in e_actions: + e_id = action['episode_id'] + + if not e_id in e_status: + continue + + episode = e_status[e_id].episode + + e_status[e_id] = EpisodeStatus(episode, action['action'], action) + + return e_status.itervalues() + + + def get_episode_data(self, episode_status, podcasts, domain, include_actions, user, devices): + """ Get episode data for an episode status object """ + + # TODO: shouldn't the podcast_id be in the episode status? + podcast_id = episode_status.episode.podcast + podcast = podcasts.get(podcast_id, None) + t = episode_data(episode_status.episode, domain, podcast) + t['status'] = episode_status.status + + # include latest action (bug 1419) + if include_actions and episode_status.action: + t['action'] = clean_episode_action_data(episode_status.action, user, devices) + + return t + + def get_since(self, request): + """ parses the "since" parameter """ + since_ = request.GET.get('since', None) + if since_ is None: + raise ValueError('parameter since missing') + try: + return datetime.fromtimestamp(float(since_)) + except ValueError as e: + raise ValueError("'since' is not a valid timestamp: %s" % str(e)) diff --git a/mygpo/api/urls.py b/mygpo/api/urls.py index a8769ba1..33a18a62 100644 --- a/mygpo/api/urls.py +++ b/mygpo/api/urls.py @@ -16,7 +16,7 @@ urlpatterns += patterns('mygpo.api.simple', url(r'^gpodder-examples\.(?P\w+)$', 'example_podcasts', name='example-opml'), ) -from mygpo.api.advanced import DeviceUpdates +from mygpo.api.advanced.updates import DeviceUpdates urlpatterns += patterns('mygpo.api.advanced', (r'^api/[12]/subscriptions/(?P[\w.-]+)/(?P[\w.-]+)\.json', 'subscriptions'), diff --git a/mygpo/users/models.py b/mygpo/users/models.py index 6900ece7..85cb816c 100644 --- a/mygpo/users/models.py +++ b/mygpo/users/models.py @@ -19,6 +19,7 @@ from mygpo.core.proxy import DocumentABCMeta, proxy_object from mygpo.decorators import repeat_on_conflict from mygpo.users.ratings import RatingMixin from mygpo.users.sync import SyncedDevicesMixin +from mygpo.users.subscriptions import subscription_changes, podcasts_for_states from mygpo.users.settings import FAV_FLAG, PUBLIC_SUB_PODCAST, SettingsMixin from mygpo.db.couchdb.podcast import podcasts_by_id, podcasts_to_dict from mygpo.db.couchdb.user import user_history, device_history @@ -352,6 +353,19 @@ class PodcastUserState(Document, SettingsMixin): return devices + def is_subscribed_on(self, device): + """ checks if the podcast is subscribed on the given device """ + + for action in reversed(self.actions): + if not action.device == device.id: + continue + + # we only need to check the latest action for the device + return (action.action == 'subscribe') + + # we haven't found any matching action + return False + def is_public(self): return self.get_wksetting(PUBLIC_SUB_PODCAST) @@ -388,17 +402,8 @@ class Device(Document, SettingsMixin): """ from mygpo.db.couchdb.podcast_state import podcast_states_for_device - - add, rem = [], [] podcast_states = podcast_states_for_device(self.id) - for p_state in podcast_states: - change = p_state.get_change_between(self.id, since, until) - if change == 'subscribe': - add.append( p_state.ref_url ) - elif change == 'unsubscribe': - rem.append( p_state.ref_url ) - - return add, rem + return subscription_changes(self.id, podcast_states, since, until) def get_latest_changes(self): @@ -433,14 +438,7 @@ class Device(Document, SettingsMixin): the podcast """ states = self.get_subscribed_podcast_states() - podcast_ids = [state.podcast for state in states] - podcasts = podcasts_to_dict(podcast_ids) - - for state in states: - podcast = proxy_object(podcasts[state.podcast], url=state.ref_url) - podcasts[state.podcast] = podcast - - return podcasts.values() + return podcasts_for_states(states) def __hash__(self): diff --git a/mygpo/users/subscriptions.py b/mygpo/users/subscriptions.py index 36e5037a..e15f5e91 100644 --- a/mygpo/users/subscriptions.py +++ b/mygpo/users/subscriptions.py @@ -1,4 +1,6 @@ +from mygpo.core.proxy import proxy_object from mygpo.db.couchdb.user import get_num_listened_episodes +from mygpo.db.couchdb.podcast import podcasts_to_dict class PodcastSorter(object): @@ -58,3 +60,30 @@ class PodcastPercentageListenedSorter(PodcastSorter): podcast.episodes_listened = 0 return sorted(self.podcasts, key=SORT_KEY, reverse=True) + + +def subscription_changes(device_id, podcast_states, since, until): + """ returns subscription changes for the device and podcast states """ + + add, rem = [], [] + for p_state in podcast_states: + change = p_state.get_change_between(device_id, since, until) + if change == 'subscribe': + add.append( p_state.ref_url ) + elif change == 'unsubscribe': + rem.append( p_state.ref_url ) + + return add, rem + + +def podcasts_for_states(podcast_states): + """ returns the podcasts corresponding to the podcast states """ + + podcast_ids = [state.podcast for state in podcast_states] + podcasts = podcasts_to_dict(podcast_ids) + + for state in podcast_states: + podcast = proxy_object(podcasts[state.podcast], url=state.ref_url) + podcasts[state.podcast] = podcast + + return podcasts.values() -- 2.11.4.GIT