Fix scheduling of podcast updates
[mygpo.git] / mygpo / data / tasks.py
blob5235fd28cb0cdacab7b1ef51c06fc696402f3a9f
1 from operator import itemgetter
2 from datetime import datetime, timedelta
4 from django.db import IntegrityError
6 from celery.decorators import periodic_task
8 from mygpo.data.podcast import calc_similar_podcasts
9 from mygpo.celery import celery
10 from mygpo.podcasts.models import Podcast
12 from celery.utils.log import get_task_logger
13 logger = get_task_logger(__name__)
16 @celery.task
17 def update_podcasts(podcast_urls):
18 """ Task to update a podcast """
19 from mygpo.data.feeddownloader import update_podcasts as update
20 podcasts = update(podcast_urls)
21 return [podcast.pk for podcast in podcasts]
24 @celery.task
25 def update_related_podcasts(podcast_pk, max_related=20):
26 get_podcast = itemgetter(0)
28 podcast = Podcast.objects.get(pk=podcast_pk)
30 related = calc_similar_podcasts(podcast)[:max_related]
31 related = map(get_podcast, related)
33 for p in related:
34 try:
35 podcast.related_podcasts.add(p)
36 except IntegrityError:
37 logger.warn('Integrity error while adding related podcast',
38 exc_info=True)
41 # interval in which podcast updates are scheduled
42 UPDATE_INTERVAL = timedelta(hours=1)
45 @periodic_task(run_every=UPDATE_INTERVAL)
46 def schedule_updates(interval=UPDATE_INTERVAL):
47 """ Schedules podcast updates that are due within ``interval`` """
48 now = datetime.utcnow()
50 # max number of updates to schedule (one every 10s)
51 max_updates = UPDATE_INTERVAL.total_seconds() / 20
53 # fetch podcasts for which an update is due within the next hour
54 podcasts = Podcast.objects.all()\
55 .next_update_between(now, now+interval)\
56 .prefetch_related('urls')\
57 .only('pk')[:max_updates]
59 _schedule_updates(podcasts)
62 @periodic_task(run_every=UPDATE_INTERVAL)
63 def schedule_updates_longest_no_update():
64 """ Schedule podcasts for update that have not been updated for longest """
66 # max number of updates to schedule (one every 20s)
67 max_updates = UPDATE_INTERVAL.total_seconds() / 20
69 podcasts = Podcast.objects.order_by('last_update')[:max_updates]
70 _schedule_updates(podcasts)
73 def _schedule_updates(podcasts):
74 """ Schedule updates for podcasts """
75 logger.info('Scheduling %d podcasts for update', podcasts.count())
77 # queue all those podcast updates
78 for podcast in podcasts:
79 # update_podcasts.delay() seems to block other task execution,
80 # therefore celery.send_task() is used instead
81 urls = [podcast.url]
82 celery.send_task('mygpo.data.tasks.update_podcasts',
83 args=[urls])