move all User and EpisodetUserState db queries into separate module
[mygpo.git] / mygpo / api / advanced / __init__.py
blobaa1f364ef3a767a2be4819af27166c6435266f5e
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 from functools import partial
19 from itertools import imap, chain
20 from collections import defaultdict, namedtuple
21 from datetime import datetime
23 import dateutil.parser
24 import gevent
26 from django.http import HttpResponse, HttpResponseBadRequest, Http404, HttpResponseNotFound
27 from django.contrib.sites.models import RequestSite
28 from django.db import IntegrityError
29 from django.views.decorators.csrf import csrf_exempt
30 from django.views.decorators.cache import never_cache
32 from mygpo.api.constants import EPISODE_ACTION_TYPES, DEVICE_TYPES
33 from mygpo.api.httpresponse import JsonResponse
34 from mygpo.api.sanitizing import sanitize_url, sanitize_urls
35 from mygpo.api.advanced.directory import episode_data, podcast_data
36 from mygpo.api.backend import get_device, BulkSubscribe
37 from mygpo.couch import BulkException, bulk_save_retry, get_main_database
38 from mygpo.log import log
39 from mygpo.utils import parse_time, format_time, parse_bool, get_timestamp
40 from mygpo.decorators import allowed_methods, repeat_on_conflict
41 from mygpo.core import models
42 from mygpo.core.models import SanitizingRule, Podcast
43 from mygpo.users.models import PodcastUserState, EpisodeAction, \
44 EpisodeUserState, DeviceDoesNotExist, DeviceUIDException
45 from mygpo.json import json, JSONDecodeError
46 from mygpo.api.basic_auth import require_valid_user, check_username
47 from mygpo.db.couchdb.episode import episode_by_id, \
48 favorite_episodes_for_user, episodes_for_podcast
49 from mygpo.db.couchdb.podcast import podcast_for_url
50 from mygpo.db.couchdb.podcast_state import subscribed_podcast_ids_by_device
51 from mygpo.db.couchdb.episode_state import get_podcasts_episode_states, \
52 episode_state_for_ref_urls, get_episode_actions
55 # keys that are allowed in episode actions
56 EPISODE_ACTION_KEYS = ('position', 'episode', 'action', 'device', 'timestamp',
57 'started', 'total', 'podcast')
60 @csrf_exempt
61 @require_valid_user
62 @check_username
63 @never_cache
64 @allowed_methods(['GET', 'POST'])
65 def subscriptions(request, username, device_uid):
67 now = datetime.now()
68 now_ = get_timestamp(now)
70 if request.method == 'GET':
72 try:
73 device = request.user.get_device_by_uid(device_uid)
74 except DeviceDoesNotExist as e:
75 return HttpResponseNotFound(str(e))
77 since_ = request.GET.get('since', None)
78 if since_ == None:
79 return HttpResponseBadRequest('parameter since missing')
80 try:
81 since = datetime.fromtimestamp(float(since_))
82 except ValueError:
83 return HttpResponseBadRequest('since-value is not a valid timestamp')
85 changes = get_subscription_changes(request.user, device, since, now)
87 return JsonResponse(changes)
89 elif request.method == 'POST':
90 d = get_device(request.user, device_uid,
91 request.META.get('HTTP_USER_AGENT', ''))
93 if not request.raw_post_data:
94 return HttpResponseBadRequest('POST data must not be empty')
96 actions = json.loads(request.raw_post_data)
97 add = actions['add'] if 'add' in actions else []
98 rem = actions['remove'] if 'remove' in actions else []
100 add = filter(None, add)
101 rem = filter(None, rem)
103 try:
104 update_urls = update_subscriptions(request.user, d, add, rem)
105 except IntegrityError, e:
106 return HttpResponseBadRequest(e)
108 return JsonResponse({
109 'timestamp': now_,
110 'update_urls': update_urls,
114 def update_subscriptions(user, device, add, remove):
116 for a in add:
117 if a in remove:
118 raise IntegrityError('can not add and remove %s at the same time' % a)
120 add_s = list(sanitize_urls(add, 'podcast'))
121 rem_s = list(sanitize_urls(remove, 'podcast'))
123 assert len(add) == len(add_s) and len(remove) == len(rem_s)
125 updated_urls = filter(lambda (a, b): a != b, zip(add + remove, add_s + rem_s))
127 add_s = filter(None, add_s)
128 rem_s = filter(None, rem_s)
130 # If two different URLs (in add and remove) have
131 # been sanitized to the same, we ignore the removal
132 rem_s = filter(lambda x: x not in add_s, rem_s)
134 subscriber = BulkSubscribe(user, device)
136 for a in add_s:
137 subscriber.add_action(a, 'subscribe')
139 for r in rem_s:
140 subscriber.add_action(r, 'unsubscribe')
142 try:
143 subscriber.execute()
144 except BulkException as be:
145 for err in be.errors:
146 log('Advanced API: %(username)s: Updating subscription for '
147 '%(podcast_url)s on %(device_uid)s failed: '
148 '%(rerror)s (%(reason)s)'.format(username=user.username,
149 podcast_url=err.doc, device_uid=device.uid,
150 error=err.error, reason=err.reason)
153 return updated_urls
156 def get_subscription_changes(user, device, since, until):
157 add_urls, rem_urls = device.get_subscription_changes(since, until)
158 until_ = get_timestamp(until)
159 return {'add': add_urls, 'remove': rem_urls, 'timestamp': until_}
162 @csrf_exempt
163 @require_valid_user
164 @check_username
165 @never_cache
166 @allowed_methods(['GET', 'POST'])
167 def episodes(request, username, version=1):
169 version = int(version)
170 now = datetime.now()
171 now_ = get_timestamp(now)
172 ua_string = request.META.get('HTTP_USER_AGENT', '')
174 if request.method == 'POST':
175 try:
176 actions = json.loads(request.raw_post_data)
177 except (JSONDecodeError, UnicodeDecodeError) as e:
178 log('Advanced API: could not decode episode update POST data for user %s: %s' % (username, e))
179 return HttpResponseBadRequest()
181 try:
182 update_urls = update_episodes(request.user, actions, now, ua_string)
183 except DeviceUIDException as e:
184 import traceback
185 log('could not update episodes for user %s: %s %s: %s' % (username, e, traceback.format_exc(), actions))
186 return HttpResponseBadRequest(str(e))
188 return JsonResponse({'timestamp': now_, 'update_urls': update_urls})
190 elif request.method == 'GET':
191 podcast_url= request.GET.get('podcast', None)
192 device_uid = request.GET.get('device', None)
193 since_ = request.GET.get('since', None)
194 aggregated = parse_bool(request.GET.get('aggregated', False))
196 try:
197 since = datetime.fromtimestamp(float(since_)) if since_ else None
198 except ValueError:
199 return HttpResponseBadRequest('since-value is not a valid timestamp')
201 if podcast_url:
202 podcast = podcast_for_url(podcast_url)
203 if not podcast:
204 raise Http404
205 else:
206 podcast = None
208 if device_uid:
210 try:
211 device = request.user.get_device_by_uid(device_uid)
212 except DeviceDoesNotExist as e:
213 return HttpResponseNotFound(str(e))
215 else:
216 device = None
218 changes = get_episode_changes(request.user, podcast, device, since,
219 now, aggregated, version)
221 return JsonResponse(changes)
225 def convert_position(action):
226 """ convert position parameter for API 1 compatibility """
227 pos = getattr(action, 'position', None)
228 if pos is not None:
229 action.position = format_time(pos)
230 return action
234 def get_episode_changes(user, podcast, device, since, until, aggregated, version):
236 devices = dict( (dev.id, dev.uid) for dev in user.devices )
238 args = {}
239 if podcast is not None:
240 args['podcast_id'] = podcast.get_id()
242 if device is not None:
243 args['device_id'] = device.id
245 actions = get_episode_actions(user._id, since, until, **args)
247 if version == 1:
248 actions = imap(convert_position, actions)
250 clean_data = partial(clean_episode_action_data,
251 user=user, devices=devices)
253 actions = map(clean_data, actions)
254 actions = filter(None, actions)
256 if aggregated:
257 actions = dict( (a['episode'], a) for a in actions ).values()
259 until_ = get_timestamp(until)
261 return {'actions': actions, 'timestamp': until_}
266 def clean_episode_action_data(action, user, devices):
268 if None in (action.get('podcast', None), action.get('episode', None)):
269 return None
271 if 'device_id' in action:
272 device_id = action['device_id']
273 device_uid = devices.get(device_id)
274 if device_uid:
275 action['device'] = device_uid
277 del action['device_id']
279 # remove superfluous keys
280 for x in action.keys():
281 if x not in EPISODE_ACTION_KEYS:
282 del action[x]
284 # set missing keys to None
285 for x in EPISODE_ACTION_KEYS:
286 if x not in action:
287 action[x] = None
289 if action['action'] != 'play':
290 if 'position' in action:
291 del action['position']
293 if 'total' in action:
294 del action['total']
296 if 'started' in action:
297 del action['started']
299 if 'playmark' in action:
300 del action['playmark']
302 else:
303 action['position'] = action.get('position', False) or 0
305 return action
311 def update_episodes(user, actions, now, ua_string):
312 update_urls = []
314 grouped_actions = defaultdict(list)
316 # group all actions by their episode
317 for action in actions:
319 podcast_url = action['podcast']
320 podcast_url = sanitize_append(podcast_url, 'podcast', update_urls)
321 if podcast_url == '': continue
323 episode_url = action['episode']
324 episode_url = sanitize_append(episode_url, 'episode', update_urls)
325 if episode_url == '': continue
327 act = parse_episode_action(action, user, update_urls, now, ua_string)
328 grouped_actions[ (podcast_url, episode_url) ].append(act)
330 # Prepare the updates for each episode state
331 obj_funs = []
333 for (p_url, e_url), action_list in grouped_actions.iteritems():
334 episode_state = episode_state_for_ref_urls(user, p_url, e_url)
336 fun = partial(update_episode_actions, action_list=action_list)
337 obj_funs.append( (episode_state, fun) )
339 db = get_main_database()
340 bulk_save_retry(db, obj_funs)
342 return update_urls
345 def update_episode_actions(episode_state, action_list):
346 """ Adds actions to the episode state and saves if necessary """
348 len1 = len(episode_state.actions)
349 episode_state.add_actions(action_list)
351 if len(episode_state.actions) == len1:
352 return None
354 return episode_state
358 def parse_episode_action(action, user, update_urls, now, ua_string):
359 action_str = action.get('action', None)
360 if not valid_episodeaction(action_str):
361 raise Exception('invalid action %s' % action_str)
363 new_action = EpisodeAction()
365 new_action.action = action['action']
367 if action.get('device', False):
368 device = get_device(user, action['device'], ua_string)
369 new_action.device = device.id
371 if action.get('timestamp', False):
372 new_action.timestamp = dateutil.parser.parse(action['timestamp'])
373 else:
374 new_action.timestamp = now
375 new_action.timestamp = new_action.timestamp.replace(microsecond=0)
377 new_action.started = action.get('started', None)
378 new_action.playmark = action.get('position', None)
379 new_action.total = action.get('total', None)
381 return new_action
384 @csrf_exempt
385 @require_valid_user
386 @check_username
387 @never_cache
388 # Workaround for mygpoclient 1.0: It uses "PUT" requests
389 # instead of "POST" requests for uploading device settings
390 @allowed_methods(['POST', 'PUT'])
391 def device(request, username, device_uid):
392 d = get_device(request.user, device_uid,
393 request.META.get('HTTP_USER_AGENT', ''))
395 data = json.loads(request.raw_post_data)
397 if 'caption' in data:
398 if not data['caption']:
399 return HttpResponseBadRequest('caption must not be empty')
400 d.name = data['caption']
402 if 'type' in data:
403 if not valid_devicetype(data['type']):
404 return HttpResponseBadRequest('invalid device type %s' % data['type'])
405 d.type = data['type']
408 request.user.update_device(d)
410 return HttpResponse()
413 def valid_devicetype(type):
414 for t in DEVICE_TYPES:
415 if t[0] == type:
416 return True
417 return False
419 def valid_episodeaction(type):
420 for t in EPISODE_ACTION_TYPES:
421 if t[0] == type:
422 return True
423 return False
426 @csrf_exempt
427 @require_valid_user
428 @check_username
429 @never_cache
430 @allowed_methods(['GET'])
431 def devices(request, username):
432 devices = filter(lambda d: not d.deleted, request.user.devices)
433 devices = map(device_data, devices)
434 return JsonResponse(devices)
437 def device_data(device):
438 return dict(
439 id = device.uid,
440 caption = device.name,
441 type = device.type,
442 subscriptions= len(subscribed_podcast_ids_by_device(device)),
447 def get_podcast_data(podcasts, domain, url):
448 """ Gets podcast data for a URL from a dict of podcasts """
449 podcast = podcasts.get(url)
450 return podcast_data(podcast, domain)
453 def get_episode_data(podcasts, domain, clean_action_data, include_actions, episode_status):
454 """ Get episode data for an episode status object """
455 podcast_id = episode_status.episode.podcast
456 podcast = podcasts.get(podcast_id, None)
457 t = episode_data(episode_status.episode, domain, podcast)
458 t['status'] = episode_status.status
460 # include latest action (bug 1419)
461 if include_actions and episode_status.action:
462 t['action'] = clean_action_data(episode_status.action)
464 return t
467 @csrf_exempt
468 @require_valid_user
469 @check_username
470 @never_cache
471 def updates(request, username, device_uid):
472 now = datetime.now()
473 now_ = get_timestamp(now)
475 try:
476 device = request.user.get_device_by_uid(device_uid)
477 except DeviceDoesNotExist as e:
478 return HttpResponseNotFound(str(e))
480 since_ = request.GET.get('since', None)
481 if since_ == None:
482 return HttpResponseBadRequest('parameter since missing')
483 try:
484 since = datetime.fromtimestamp(float(since_))
485 except ValueError:
486 return HttpResponseBadRequest('since-value is not a valid timestamp')
488 include_actions = parse_bool(request.GET.get('include_actions', False))
490 ret = get_subscription_changes(request.user, device, since, now)
491 domain = RequestSite(request).domain
493 subscriptions = list(device.get_subscribed_podcasts())
495 podcasts = dict( (p.url, p) for p in subscriptions )
496 prepare_podcast_data = partial(get_podcast_data, podcasts, domain)
498 ret['add'] = map(prepare_podcast_data, ret['add'])
500 devices = dict( (dev.id, dev.uid) for dev in request.user.devices )
501 clean_action_data = partial(clean_episode_action_data,
502 user=request.user, devices=devices)
504 # index subscribed podcasts by their Id for fast access
505 podcasts = dict( (p.get_id(), p) for p in subscriptions )
506 prepare_episode_data = partial(get_episode_data, podcasts, domain,
507 clean_action_data, include_actions)
509 episode_updates = get_episode_updates(request.user, subscriptions, since)
510 ret['updates'] = map(prepare_episode_data, episode_updates)
512 return JsonResponse(ret)
515 def get_episode_updates(user, subscribed_podcasts, since):
516 """ Returns the episode updates since the timestamp """
518 EpisodeStatus = namedtuple('EpisodeStatus', 'episode status action')
520 episode_status = {}
522 # get episodes
523 episode_jobs = [gevent.spawn(episodes_for_podcast, p, since) for p in
524 subscribed_podcasts]
525 gevent.joinall(episode_jobs)
526 episodes = chain.from_iterable(job.get() for job in episode_jobs)
528 for episode in episodes:
529 episode_status[episode._id] = EpisodeStatus(episode, 'new', None)
531 # get episode states
532 e_action_jobs = [gevent.spawn(get_podcasts_episode_states(p, user._id))
533 for p in subscribed_podcasts]
534 gevent.joinall(e_action_jobs)
535 e_actions = chain.from_iterable(job.get() for job in e_action_jobs)
537 for action in e_actions:
538 e_id = action['episode_id']
540 if e_id in episode_status:
541 episode = episode_status[e_id].episode
542 else:
543 episode = episode_by_id(e_id)
545 episode_status[e_id] = EpisodeStatus(episode, action['action'], action)
547 return episode_status.itervalues()
550 @require_valid_user
551 @check_username
552 @never_cache
553 def favorites(request, username):
554 favorites = favorite_episodes_for_user(request.user)
555 domain = RequestSite(request).domain
556 e_data = lambda e: episode_data(e, domain)
557 ret = map(e_data, favorites)
558 return JsonResponse(ret)
561 def sanitize_append(url, obj_type, sanitized_list):
562 urls = sanitize_url(url, obj_type)
563 if url != urls:
564 sanitized_list.append( (url, urls) )
565 return urls