1 from operator
import itemgetter
2 from datetime
import datetime
, timedelta
4 from django
.db
import IntegrityError
6 from celery
import shared_task
7 from django_db_geventpool
.utils
import close_connection
9 from mygpo
.data
.podcast
import calc_similar_podcasts
10 from mygpo
.celery
import celery
11 from mygpo
.podcasts
.models
import Podcast
13 from celery
.utils
.log
import get_task_logger
15 logger
= get_task_logger(__name__
)
20 def update_podcasts(podcast_urls
):
21 """Task to update a podcast"""
22 from mygpo
.data
.feeddownloader
import update_podcasts
as update
24 podcasts
= update(podcast_urls
)
25 podcasts
= filter(None, podcasts
)
26 return [podcast
.pk
for podcast
in podcasts
]
31 def update_related_podcasts(podcast_pk
, max_related
=20):
32 get_podcast
= itemgetter(0)
34 podcast
= Podcast
.objects
.get(pk
=podcast_pk
)
36 related
= calc_similar_podcasts(podcast
)[:max_related
]
37 related
= map(get_podcast
, related
)
41 podcast
.related_podcasts
.add(p
)
42 except IntegrityError
:
44 "Integrity error while adding related podcast", exc_info
=True
48 # interval in which podcast updates are scheduled
49 UPDATE_INTERVAL
= timedelta(hours
=1)
54 def schedule_updates(interval
=UPDATE_INTERVAL
):
55 """Schedules podcast updates that are due within ``interval``"""
56 now
= datetime
.utcnow()
58 # max number of updates to schedule (one every 10s)
59 max_updates
= UPDATE_INTERVAL
.total_seconds() / 10
61 # fetch podcasts for which an update is due within the next hour
64 .next_update_between(now
, now
+ interval
)
65 .prefetch_related("urls")
66 .only("pk")[:max_updates
]
69 _schedule_updates(podcasts
)
74 def schedule_updates_longest_no_update():
75 """Schedule podcasts for update that have not been updated for longest"""
77 # max number of updates to schedule (one every 20s)
78 max_updates
= UPDATE_INTERVAL
.total_seconds() / 10
80 podcasts
= Podcast
.objects
.order_by("last_update")[:max_updates
]
81 _schedule_updates(podcasts
)
84 def _schedule_updates(podcasts
):
85 """Schedule updates for podcasts"""
86 logger
.info("Scheduling %d podcasts for update", len(podcasts
))
88 # queue all those podcast updates
89 for podcast
in podcasts
:
90 # update_podcasts.delay() seems to block other task execution,
91 # therefore celery.send_task() is used instead
93 celery
.send_task("mygpo.data.tasks.update_podcasts", args
=[urls
])