Merge pull request #793 from gpodder/remove-advertise
[mygpo.git] / mygpo / data / tasks.py
blob0c642aead95643784934e49b26c1475c17a575b2
1 from operator import itemgetter
2 from datetime import datetime, timedelta
4 from django.db import IntegrityError
6 from celery import shared_task
7 from django_db_geventpool.utils import close_connection
9 from mygpo.data.podcast import calc_similar_podcasts
10 from mygpo.celery import celery
11 from mygpo.podcasts.models import Podcast
13 from celery.utils.log import get_task_logger
15 logger = get_task_logger(__name__)
18 @shared_task
19 @close_connection
20 def update_podcasts(podcast_urls):
21 """Task to update a podcast"""
22 from mygpo.data.feeddownloader import update_podcasts as update
24 podcasts = update(podcast_urls)
25 podcasts = filter(None, podcasts)
26 return [podcast.pk for podcast in podcasts]
29 @shared_task
30 @close_connection
31 def update_related_podcasts(podcast_pk, max_related=20):
32 get_podcast = itemgetter(0)
34 podcast = Podcast.objects.get(pk=podcast_pk)
36 related = calc_similar_podcasts(podcast)[:max_related]
37 related = map(get_podcast, related)
39 for p in related:
40 try:
41 podcast.related_podcasts.add(p)
42 except IntegrityError:
43 logger.warning(
44 "Integrity error while adding related podcast", exc_info=True
48 # interval in which podcast updates are scheduled
49 UPDATE_INTERVAL = timedelta(hours=1)
52 @shared_task
53 @close_connection
54 def schedule_updates(interval=UPDATE_INTERVAL):
55 """Schedules podcast updates that are due within ``interval``"""
56 now = datetime.utcnow()
58 # max number of updates to schedule (one every 10s)
59 max_updates = UPDATE_INTERVAL.total_seconds() / 10
61 # fetch podcasts for which an update is due within the next hour
62 podcasts = (
63 Podcast.objects.all()
64 .next_update_between(now, now + interval)
65 .prefetch_related("urls")
66 .only("pk")[:max_updates]
69 _schedule_updates(podcasts)
72 @shared_task
73 @close_connection
74 def schedule_updates_longest_no_update():
75 """Schedule podcasts for update that have not been updated for longest"""
77 # max number of updates to schedule (one every 20s)
78 max_updates = UPDATE_INTERVAL.total_seconds() / 10
80 podcasts = Podcast.objects.order_by("last_update")[:max_updates]
81 _schedule_updates(podcasts)
84 def _schedule_updates(podcasts):
85 """Schedule updates for podcasts"""
86 logger.info("Scheduling %d podcasts for update", len(podcasts))
88 # queue all those podcast updates
89 for podcast in podcasts:
90 # update_podcasts.delay() seems to block other task execution,
91 # therefore celery.send_task() is used instead
92 urls = [podcast.url]
93 celery.send_task("mygpo.data.tasks.update_podcasts", args=[urls])