[Migration] Merge Episodes in PostgreSQL
[mygpo.git] / mygpo / maintenance / migrate.py
blobae6ade1a1020c6f96b5bf2c0adfbb502061265f9
1 from __future__ import unicode_literals
3 from mygpo.core.models import Podcast as P, Episode as E, PodcastGroup as G
4 from django.contrib.contenttypes.models import ContentType
5 from django.db import transaction, IntegrityError
6 from django.utils.text import slugify
7 import json
8 from datetime import datetime
9 from mygpo.podcasts.models import (Podcast, Episode, URL, Slug, Tag,
10 MergedUUID, PodcastGroup, )
11 from mygpo.db.couchdb.podcast_state import podcast_subscriber_count
13 import logging
14 logger = logging.getLogger(__name__)
17 def to_maxlength(cls, field, val):
18 """ Cut val to the maximum length of cls's field """
19 max_length = cls._meta.get_field(field).max_length
20 orig_length = len(val)
21 if orig_length > max_length:
22 val = val[:max_length]
23 logger.warn('%s.%s length reduced from %d to %d',
24 cls.__name__, field, orig_length, max_length)
26 return val
29 def migrate_episode(e):
31 podcast, created = Podcast.objects.get_or_create(id=e.podcast)
33 if created:
34 logger.info('Created stub for podcast %s', e.podcast)
36 e2, created = Episode.objects.update_or_create(id=e._id, defaults = {
37 'title': e.title or '',
38 'subtitle': e.subtitle or '',
39 'guid': to_maxlength(Episode, 'guid', e.guid) if e.guid is not None else None,
40 'description': e.description or '',
41 'content': e.content or '',
42 'link': e.link,
43 'released': e.released,
44 'author': e.author,
45 'duration': max(0, e.duration) if e.duration is not None else None,
46 'filesize': max(0, e.filesize) if e.filesize is not None else None,
47 'language': to_maxlength(Episode, 'language', e.language) if e.language is not None else None,
48 'last_update': e.last_update,
49 'outdated': e.outdated,
50 'mimetypes': ','.join(e.mimetypes),
51 'listeners': max(0, e.listeners) if e.listeners is not None else None,
52 'content_types': ','.join(e.content_types),
53 'flattr_url': to_maxlength(Episode, 'flattr_url', e.flattr_url) if e.flattr_url else None,
54 'created': datetime.fromtimestamp(e.created_timestamp) if e.created_timestamp else datetime.utcnow(),
55 'license': e.license,
56 'podcast': podcast,
59 update_urls(e, e2)
60 update_slugs(e, e2)
61 update_ids(e, e2)
65 def migrate_podcast(p):
66 logger.info('Migrating podcast %r', p)
68 if p.group_member_name:
69 pid = p.id
70 else:
71 pid = p._id
73 p2, created = Podcast.objects.update_or_create(id=pid, defaults = {
74 'title': p.title or '',
75 'subtitle': p.subtitle or '',
76 'description': p.description or '',
77 'link': p.link,
78 'language': to_maxlength(Podcast, 'language', p.language) if p.language is not None else None,
79 'created': datetime.fromtimestamp(p.created_timestamp) if p.created_timestamp else datetime.utcnow(),
80 'last_update': p.last_update,
81 'license': p.license,
82 'flattr_url': to_maxlength(Podcast, 'flattr_url', p.flattr_url) if p.flattr_url else None,
83 'outdated': p.outdated,
84 'author': p.author,
85 'logo_url': p.logo_url,
86 'common_episode_title': p.common_episode_title or '',
87 'new_location': p.new_location,
88 'latest_episode_timestamp': p.latest_episode_timestamp,
89 'episode_count': p.episode_count or 0,
90 'hub': p.hub,
91 'content_types': ','.join(p.content_types),
92 'restrictions': ','.join(p.restrictions),
93 'twitter': getattr(p, 'twitter', None),
94 'group_member_name': p.group_member_name,
95 'update_interval': p.update_interval,
96 'subscribers': podcast_subscriber_count(p),
99 update_urls(p, p2)
100 update_slugs(p, p2)
101 update_tags(p, p2)
102 update_ids(p, p2)
104 return p2
107 def migrate_podcastgroup(g):
108 logger.info('Migrating podcast group %r', g)
110 g2, created = PodcastGroup.objects.update_or_create(id=g._id, defaults = {
111 'title': g.title,
114 for p in g.podcasts:
115 p2 = migrate_podcast(p)
116 p2.group = g2
117 p2.save()
119 update_slugs(g, g2)
121 return g2
125 def update_urls(old, new):
127 existing_urls = {u.url: u for u in new.urls.all()}
128 logger.info('%d existing URLs', len(existing_urls))
130 new_urls = old.urls
131 logger.info('%d new URLs', len(new_urls))
133 with transaction.atomic():
134 max_order = max([s.order for s in existing_urls.values()] + [len(new_urls)])
135 logger.info('Renumbering URLs starting from %d', max_order)
136 for n, url in enumerate(existing_urls.values(), max_order+1):
137 url.order = n
138 url.save()
140 logger.info('%d existing URLs', len(existing_urls))
141 for n, url in enumerate(new_urls):
142 try:
143 u = existing_urls.pop(url)
144 u.order = n
145 u.save()
146 except KeyError:
147 try:
148 URL.objects.create(url=to_maxlength(URL, 'url', url),
149 content_object=new,
150 order=n,
151 scope=new.scope,
153 except IntegrityError as ie:
154 logger.warn('Could not create URL for %s: %s', new, ie)
156 with transaction.atomic():
157 delete = [u.pk for u in existing_urls.values()]
158 logger.info('Deleting %d URLs', len(delete))
159 URL.objects.filter(id__in=delete).delete()
162 def update_slugs(old, new):
164 existing_slugs = {s.slug: s for s in new.slugs.all()}
165 logger.info('%d existing slugs', len(existing_slugs))
167 new_slugs = filter(None, [old.slug] + old.merged_slugs + [old.oldid] + old.merged_oldids)
168 new_slugs = map(unicode, new_slugs)
169 new_slugs = map(slugify, new_slugs)
170 logger.info('%d new slugs', len(new_slugs))
172 with transaction.atomic():
173 max_order = max([s.order for s in existing_slugs.values()] + [len(new_slugs)])
174 logger.info('Renumbering slugs starting from %d', max_order)
175 for n, slug in enumerate(existing_slugs.values(), max_order+1):
176 slug.order = n
177 slug.save()
179 logger.info('%d existing slugs', len(existing_slugs))
181 for n, slug in enumerate(new_slugs):
182 try:
183 s = existing_slugs.pop(slug)
184 logger.info('Updating new slug %d: %s', n, slug)
185 s.order = n
186 s.save()
187 except KeyError:
188 logger.info('Creating new slug %d: %s', n, slug)
189 try:
190 Slug.objects.create(slug=to_maxlength(Slug, 'slug', slug),
191 content_object=new,
192 order=n,
193 scope=new.scope,
195 except IntegrityError as ie:
196 logger.warn('Could not create Slug for %s: %s', new, ie)
198 with transaction.atomic():
199 delete = [s.pk for s in existing_slugs.values()]
200 logger.info('Deleting %d slugs', len(delete))
201 Slug.objects.filter(id__in=delete).delete()
204 @transaction.atomic
205 def update_tags(old, new):
206 # TODO: delete?
207 for tag in old.tags.get('feed', []):
208 t, created = Tag.objects.get_or_create(
209 tag=to_maxlength(Tag, 'tag', tag),
210 source=Tag.FEED,
211 content_type=ContentType.objects.get_for_model(new),
212 object_id=new.pk,
216 @transaction.atomic
217 def update_ids(old, new):
218 # TODO: delete?
219 for mid in old.merged_ids:
220 u, created = MergedUUID.objects.get_or_create(
221 uuid = mid,
222 content_type=ContentType.objects.get_for_model(new),
223 object_id=new.pk,
227 from couchdbkit import Database
228 db = Database('http://127.0.0.1:6984/mygpo_core_copy')
229 from couchdbkit.changes import ChangesStream, fold, foreach
232 MIGRATIONS = {
233 'Podcast': (P, migrate_podcast),
234 'Episode': (E, migrate_episode),
235 'PodcastGroup': (G, migrate_podcastgroup),
236 'PodcastList': (None, None),
237 'PodcastSubscriberData': (None, None),
238 'EmailMessage': (None, None),
239 'ExamplePodcasts': (None, None),
242 def migrate_change(c):
243 logger.info('Migrate seq %s', c['seq'])
244 doctype = c['doc']['doc_type']
246 cls, migrate = MIGRATIONS[doctype]
248 if cls is None:
249 return
251 obj = cls.wrap(c['doc'])
252 migrate(obj)
255 def migrate(since=1187918):
256 with ChangesStream(db,
257 feed="continuous",
258 heartbeat=True,
259 include_docs=True,
260 since=since,
261 ) as stream:
262 for change in stream:
263 migrate_change(change)