[Migration] Get Podcasts / Episodes from PostgreSQL
[mygpo.git] / mygpo / db / couchdb / podcast.py
blob7011674272f1e0bfbec794e99053b49e043b95ae
1 from hashlib import sha1
2 from datetime import datetime
4 from restkit import RequestFailed
5 from couchdbkit import MultipleResultsFound
7 from django.core.cache import cache
9 from mygpo.core.models import Podcast, PodcastGroup, PodcastSubscriberData
10 from mygpo.core.signals import incomplete_obj
11 from mygpo.decorators import repeat_on_conflict
12 from mygpo.cache import cache_result
13 from mygpo.utils import get_timestamp
14 from mygpo.db.couchdb import get_main_database, get_userdata_database, \
15 lucene_query
16 from mygpo.db import QueryParameterMissing
17 from mygpo.db.couchdb import get_main_database, get_single_result
18 from mygpo.db.couchdb.utils import multi_request_view, is_couchdb_id
20 import logging
21 logger = logging.getLogger(__name__)
24 def podcast_by_id_uncached(podcast_id, current_id=False):
26 if not podcast_id:
27 raise QueryParameterMissing('podcast_id')
29 db = get_main_database()
30 podcast = get_single_result(db, 'podcasts/by_id',
31 key = podcast_id,
32 include_docs = True,
33 wrapper = _wrap_podcast_group,
36 if not podcast:
37 return None
39 if podcast.needs_update:
40 incomplete_obj.send_robust(sender=podcast)
42 return podcast
45 podcast_by_id = cache_result(timeout=60*60)(podcast_by_id_uncached)
48 def podcasts_by_id(ids):
50 if ids is None:
51 raise QueryParameterMissing('ids')
53 if not ids:
54 return []
56 r = Podcast.view('podcasts/by_id',
57 keys = ids,
58 include_docs = True,
59 wrap_doc = False
62 podcasts = map(_wrap_podcast_group, r)
64 for podcast in podcasts:
65 if podcast.needs_update:
66 incomplete_obj.send_robust(sender=podcast)
68 return podcasts
71 def podcasts_groups_by_id(ids):
72 """ gets podcast groups and top-level podcasts for the given ids """
74 if ids is None:
75 raise QueryParameterMissing('ids')
77 if not ids:
78 return
80 db = get_main_database()
81 res = db.view('podcasts/podcasts_groups',
82 keys = ids,
83 include_docs = True,
86 for r in res:
87 obj = _wrap_pg(r)
89 if not obj:
90 yield None
91 continue
93 if obj.needs_update:
94 incomplete_obj.send_robust(sender=obj)
96 yield obj
99 def _wrap_pg(doc):
101 doc = doc['doc']
103 if not doc:
104 return None
106 if doc['doc_type'] == 'Podcast':
107 return Podcast.wrap(doc)
109 elif doc['doc_type'] == 'PodcastGroup':
110 return PodcastGroup.wrap(doc)
112 else:
113 logger.error('received unknown doc_type "%s"', doc['doc_type'])
116 def podcasts_to_dict(ids, use_cache=False):
118 if ids is None:
119 raise QueryParameterMissing('ids')
121 if not ids:
122 return dict()
125 ids = list(set(ids))
126 objs = dict()
128 cache_objs = []
129 if use_cache:
130 res = cache.get_many(ids)
131 cache_objs.extend(res.values())
132 ids = [x for x in ids if x not in res.keys()]
134 db_objs = podcasts_by_id(ids)
136 for obj in (cache_objs + db_objs):
138 # get_multi returns dict {'key': _id, 'error': 'not found'}
139 # for non-existing objects
140 if isinstance(obj, dict) and 'error' in obj:
141 _id = obj['key']
142 objs[_id] = None
143 continue
145 for i in obj.get_ids():
146 objs[i] = obj
148 if use_cache:
149 cache.set_many(dict( (obj.get_id(), obj) for obj in db_objs))
151 return objs
154 def subscriberdata_for_podcast(podcast_id):
156 if not podcast_id:
157 raise QueryParameterMissing('podcast_id')
159 db = get_main_database()
160 data = get_single_result(db, 'podcasts/subscriber_data',
161 key = podcast_id,
162 include_docs = True,
163 schema = PodcastSubscriberData,
166 if not data:
167 data = PodcastSubscriberData()
168 data.podcast = podcast_id
170 return data
174 def _wrap_podcast_group(res):
175 if res['doc']['doc_type'] == 'Podcast':
176 return Podcast.wrap(res['doc'])
177 else:
178 pg = PodcastGroup.wrap(res['doc'])
179 id = res['key']
180 return pg.get_podcast_by_id(id)
183 def search_wrapper(result):
184 doc = result['doc']
185 if doc['doc_type'] == 'Podcast':
186 p = Podcast.wrap(doc)
187 elif doc['doc_type'] == 'PodcastGroup':
188 p = PodcastGroup.wrap(doc)
189 p._id = result['id']
190 return p
193 @cache_result(timeout=60*60)
194 def search(q, offset=0, num_results=20):
196 if not q:
197 return [], 0
199 db = get_main_database()
201 FIELDS = ['title', 'description']
202 q = lucene_query(FIELDS, q)
204 try:
205 res = db.search('podcasts/search',
206 wrapper = search_wrapper,
207 include_docs = True,
208 limit = num_results,
209 stale = 'update_after',
210 skip = offset,
211 q = q,
214 podcasts = list(res)
216 for podcast in podcasts:
217 if podcast.needs_update:
218 incomplete_obj.send_robust(sender=podcast)
220 return podcasts, res.total_rows
222 except RequestFailed:
223 return [], 0
226 def reload_podcast(podcast):
227 return podcast_by_id_uncached(podcast.get_id())
230 @repeat_on_conflict(['podcast'], reload_f=reload_podcast)
231 def update_additional_data(podcast, twitter):
232 podcast.twitter = twitter
233 podcast.save()
235 # clear the whole cache until we have a better invalidation mechanism
236 cache.clear()
239 @repeat_on_conflict(['podcast'], reload_f=reload_podcast)
240 def update_related_podcasts(podcast, related):
241 if podcast.related_podcasts == related:
242 return
244 podcast.related_podcasts = related
245 podcast.save()
248 @repeat_on_conflict(['podcast'], reload_f=reload_podcast)
249 def delete_podcast(podcast):
250 podcast.delete()