Threading: Use util.run_in_background to spawn threads
[gpodder.git] / src / gpodder / my.py
blobaceaff2749da497b994d259ed807d68212b6598c
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
4 # gPodder - A media aggregator and podcast client
5 # Copyright (c) 2005-2012 Thomas Perl and the gPodder Team
7 # gPodder is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # gPodder is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # my.py -- mygpo Client Abstraction for gPodder
24 # Thomas Perl <thp@gpodder.org>; 2010-01-19
27 import gpodder
28 _ = gpodder.gettext
30 import atexit
31 import datetime
32 import calendar
33 import os
34 import sys
35 import time
37 import logging
38 logger = logging.getLogger(__name__)
40 from gpodder import util
41 from gpodder import minidb
43 # Append gPodder's user agent to mygpoclient's user agent
44 import mygpoclient
45 mygpoclient.user_agent += ' ' + gpodder.user_agent
47 MYGPOCLIENT_REQUIRED = '1.4'
49 if not hasattr(mygpoclient, 'require_version') or \
50 not mygpoclient.require_version(MYGPOCLIENT_REQUIRED):
51 print >>sys.stderr, """
52 Please upgrade your mygpoclient library.
53 See http://thp.io/2010/mygpoclient/
55 Required version: %s
56 Installed version: %s
57 """ % (MYGPOCLIENT_REQUIRED, mygpoclient.__version__)
58 sys.exit(1)
60 from mygpoclient import api
61 from mygpoclient import public
63 from mygpoclient import util as mygpoutil
65 EXAMPLES_OPML = 'http://gpodder.org/directory.opml'
66 TOPLIST_OPML = 'http://gpodder.org/toplist.opml'
68 # Database model classes
69 class SinceValue(object):
70 __slots__ = {'host': str, 'device_id': str, 'category': int, 'since': int}
72 # Possible values for the "category" field
73 PODCASTS, EPISODES = range(2)
75 def __init__(self, host, device_id, category, since=0):
76 self.host = host
77 self.device_id = device_id
78 self.category = category
79 self.since = since
81 class SubscribeAction(object):
82 __slots__ = {'action_type': int, 'url': str}
84 # Possible values for the "action_type" field
85 ADD, REMOVE = range(2)
87 def __init__(self, action_type, url):
88 self.action_type = action_type
89 self.url = url
91 @property
92 def is_add(self):
93 return self.action_type == self.ADD
95 @property
96 def is_remove(self):
97 return self.action_type == self.REMOVE
99 @classmethod
100 def add(cls, url):
101 return cls(cls.ADD, url)
103 @classmethod
104 def remove(cls, url):
105 return cls(cls.REMOVE, url)
107 @classmethod
108 def undo(cls, action):
109 if action.is_add:
110 return cls(cls.REMOVE, action.url)
111 elif action.is_remove:
112 return cls(cls.ADD, action.url)
114 raise ValueError('Cannot undo action: %r' % action)
116 # New entity name for "received" actions
117 class ReceivedSubscribeAction(SubscribeAction): pass
119 class UpdateDeviceAction(object):
120 __slots__ = {'device_id': str, 'caption': str, 'device_type': str}
122 def __init__(self, device_id, caption, device_type):
123 self.device_id = device_id
124 self.caption = caption
125 self.device_type = device_type
127 class EpisodeAction(object):
128 __slots__ = {'podcast_url': str, 'episode_url': str, 'device_id': str,
129 'action': str, 'timestamp': int,
130 'started': int, 'position': int, 'total': int}
132 def __init__(self, podcast_url, episode_url, device_id, \
133 action, timestamp, started, position, total):
134 self.podcast_url = podcast_url
135 self.episode_url = episode_url
136 self.device_id = device_id
137 self.action = action
138 self.timestamp = timestamp
139 self.started = started
140 self.position = position
141 self.total = total
143 # New entity name for "received" actions
144 class ReceivedEpisodeAction(EpisodeAction): pass
146 class RewrittenUrl(object):
147 __slots__ = {'old_url': str, 'new_url': str}
149 def __init__(self, old_url, new_url):
150 self.old_url = old_url
151 self.new_url = new_url
152 # End Database model classes
156 # Helper class for displaying changes in the UI
157 class Change(object):
158 def __init__(self, action, podcast=None):
159 self.action = action
160 self.podcast = podcast
162 @property
163 def description(self):
164 if self.action.is_add:
165 return _('Add %s') % self.action.url
166 else:
167 return _('Remove %s') % self.podcast.title
170 class MygPoClient(object):
171 STORE_FILE = 'gpodder.net'
172 FLUSH_TIMEOUT = 60
173 FLUSH_RETRIES = 3
175 def __init__(self, config):
176 self._store = minidb.Store(os.path.join(gpodder.home, self.STORE_FILE))
178 self._config = config
179 self._client = None
181 # Initialize the _client attribute and register with config
182 self.on_config_changed()
183 assert self._client is not None
185 self._config.add_observer(self.on_config_changed)
187 self._worker_thread = None
188 atexit.register(self._at_exit)
190 def create_device(self):
191 """Uploads the device changes to the server
193 This should be called when device settings change
194 or when the mygpo client functionality is enabled.
196 # Remove all previous device update actions
197 self._store.remove(self._store.load(UpdateDeviceAction))
199 # Insert our new update action
200 action = UpdateDeviceAction(self.device_id, \
201 self._config.mygpo.device.caption, \
202 self._config.mygpo.device.type)
203 self._store.save(action)
205 def get_rewritten_urls(self):
206 """Returns a list of rewritten URLs for uploads
208 This should be called regularly. Every object returned
209 should be merged into the database, and the old_url
210 should be updated to new_url in every podcdast.
212 rewritten_urls = self._store.load(RewrittenUrl)
213 self._store.remove(rewritten_urls)
214 return rewritten_urls
216 def process_episode_actions(self, find_episode, on_updated=None):
217 """Process received episode actions
219 The parameter "find_episode" should be a function accepting
220 two parameters (podcast_url and episode_url). It will be used
221 to get an episode object that needs to be updated. It should
222 return None if the requested episode does not exist.
224 The optional callback "on_updated" should accept a single
225 parameter (the episode object) and will be called whenever
226 the episode data is changed in some way.
228 logger.debug('Processing received episode actions...')
229 for action in self._store.load(ReceivedEpisodeAction):
230 if action.action not in ('play', 'delete'):
231 # Ignore all other action types for now
232 continue
234 episode = find_episode(action.podcast_url, action.episode_url)
236 if episode is None:
237 # The episode does not exist on this client
238 continue
240 if action.action == 'play':
241 logger.debug('Play action for %s', episode.url)
242 episode.mark(is_played=True)
244 if (action.timestamp > episode.current_position_updated and
245 action.position is not None):
246 logger.debug('Updating position for %s', episode.url)
247 episode.current_position = action.position
248 episode.current_position_updated = action.timestamp
250 if action.total:
251 logger.debug('Updating total time for %s', episode.url)
252 episode.total_time = action.total
254 episode.save()
255 if on_updated is not None:
256 on_updated(episode)
257 elif action.action == 'delete':
258 if not episode.was_downloaded(and_exists=True):
259 # Set the episode to a "deleted" state
260 logger.debug('Marking as deleted: %s', episode.url)
261 episode.delete_from_disk()
262 episode.save()
263 if on_updated is not None:
264 on_updated(episode)
266 # Remove all received episode actions
267 self._store.delete(ReceivedEpisodeAction)
268 self._store.commit()
269 logger.debug('Received episode actions processed.')
271 def get_received_actions(self):
272 """Returns a list of ReceivedSubscribeAction objects
274 The list might be empty. All these actions have to
275 be processed. The user should confirm which of these
276 actions should be taken, the reest should be rejected.
278 Use confirm_received_actions and reject_received_actions
279 to return and finalize the actions received by this
280 method in order to not receive duplicate actions.
282 return self._store.load(ReceivedSubscribeAction)
284 def confirm_received_actions(self, actions):
285 """Confirm that a list of actions has been processed
287 The UI should call this with a list of actions that
288 have been accepted by the user and processed by the
289 podcast backend.
291 # Simply remove the received actions from the queue
292 self._store.remove(actions)
294 def reject_received_actions(self, actions):
295 """Reject (undo) a list of ReceivedSubscribeAction objects
297 The UI should call this with a list of actions that
298 have been rejected by the user. A reversed set of
299 actions will be uploaded to the server so that the
300 state on the server matches the state on the client.
302 # Create "undo" actions for received subscriptions
303 self._store.save(SubscribeAction.undo(a) for a in actions)
304 self.flush()
306 # After we've handled the reverse-actions, clean up
307 self._store.remove(actions)
309 @property
310 def host(self):
311 return self._config.mygpo.server
313 @property
314 def device_id(self):
315 return self._config.mygpo.device.uid
317 def can_access_webservice(self):
318 return self._config.mygpo.enabled and self._config.mygpo.device.uid
320 def set_subscriptions(self, urls):
321 if self.can_access_webservice():
322 logger.debug('Uploading (overwriting) subscriptions...')
323 self._client.put_subscriptions(self.device_id, urls)
324 logger.debug('Subscription upload done.')
325 else:
326 raise Exception('Webservice access not enabled')
328 def _convert_played_episode(self, episode, start, end, total):
329 return EpisodeAction(episode.channel.url, \
330 episode.url, self.device_id, 'play', \
331 int(time.time()), start, end, total)
333 def _convert_episode(self, episode, action):
334 return EpisodeAction(episode.channel.url, \
335 episode.url, self.device_id, action, \
336 int(time.time()), None, None, None)
338 def on_delete(self, episodes):
339 logger.debug('Storing %d episode delete actions', len(episodes))
340 self._store.save(self._convert_episode(e, 'delete') for e in episodes)
342 def on_download(self, episodes):
343 logger.debug('Storing %d episode download actions', len(episodes))
344 self._store.save(self._convert_episode(e, 'download') for e in episodes)
346 def on_playback_full(self, episode, start, end, total):
347 logger.debug('Storing full episode playback action')
348 self._store.save(self._convert_played_episode(episode, start, end, total))
350 def on_playback(self, episodes):
351 logger.debug('Storing %d episode playback actions', len(episodes))
352 self._store.save(self._convert_episode(e, 'play') for e in episodes)
354 def on_subscribe(self, urls):
355 # Cancel previously-inserted "remove" actions
356 self._store.remove(SubscribeAction.remove(url) for url in urls)
358 # Insert new "add" actions
359 self._store.save(SubscribeAction.add(url) for url in urls)
361 self.flush()
363 def on_unsubscribe(self, urls):
364 # Cancel previously-inserted "add" actions
365 self._store.remove(SubscribeAction.add(url) for url in urls)
367 # Insert new "remove" actions
368 self._store.save(SubscribeAction.remove(url) for url in urls)
370 self.flush()
372 def _at_exit(self):
373 self._worker_proc(forced=True)
374 self._store.commit()
375 self._store.close()
377 def _worker_proc(self, forced=False):
378 if not forced:
379 # Store the current contents of the queue database
380 self._store.commit()
382 logger.debug('Worker thread waiting for timeout')
383 time.sleep(self.FLUSH_TIMEOUT)
385 # Only work when enabled, UID set and allowed to work
386 if self.can_access_webservice() and \
387 (self._worker_thread is not None or forced):
388 self._worker_thread = None
390 logger.debug('Worker thread starting to work...')
391 for retry in range(self.FLUSH_RETRIES):
392 must_retry = False
394 if retry:
395 logger.debug('Retrying flush queue...')
397 # Update the device first, so it can be created if new
398 for action in self._store.load(UpdateDeviceAction):
399 if self.update_device(action):
400 self._store.remove(action)
401 else:
402 must_retry = True
404 # Upload podcast subscription actions
405 actions = self._store.load(SubscribeAction)
406 if self.synchronize_subscriptions(actions):
407 self._store.remove(actions)
408 else:
409 must_retry = True
411 # Upload episode actions
412 actions = self._store.load(EpisodeAction)
413 if self.synchronize_episodes(actions):
414 self._store.remove(actions)
415 else:
416 must_retry = True
418 if not must_retry:
419 # No more pending actions. Ready to quit.
420 break
422 logger.debug('Worker thread finished.')
423 else:
424 logger.info('Worker thread may not execute (disabled).')
426 # Store the current contents of the queue database
427 self._store.commit()
429 def flush(self, now=False):
430 if not self.can_access_webservice():
431 logger.warn('Flush requested, but sync disabled.')
432 return
434 if self._worker_thread is None or now:
435 if now:
436 logger.debug('Flushing NOW.')
437 else:
438 logger.debug('Flush requested.')
439 self._worker_thread = util.run_in_background(lambda: self._worker_proc(now), True)
440 else:
441 logger.debug('Flush requested, already waiting.')
443 def on_config_changed(self, name=None, old_value=None, new_value=None):
444 if name in ('mygpo.username', 'mygpo.password', 'mygpo.server') \
445 or self._client is None:
446 self._client = api.MygPodderClient(self._config.mygpo.username,
447 self._config.mygpo.password, self._config.mygpo.server)
448 logger.info('Reloading settings.')
449 elif name.startswith('mygpo.device.'):
450 # Update or create the device
451 self.create_device()
453 def synchronize_episodes(self, actions):
454 logger.debug('Starting episode status sync.')
456 def convert_to_api(action):
457 dt = datetime.datetime.utcfromtimestamp(action.timestamp)
458 action_ts = mygpoutil.datetime_to_iso8601(dt)
459 return api.EpisodeAction(action.podcast_url, \
460 action.episode_url, action.action, \
461 action.device_id, action_ts, \
462 action.started, action.position, action.total)
464 def convert_from_api(action):
465 dt = mygpoutil.iso8601_to_datetime(action.timestamp)
466 action_ts = calendar.timegm(dt.timetuple())
467 return ReceivedEpisodeAction(action.podcast, \
468 action.episode, action.device, \
469 action.action, action_ts, \
470 action.started, action.position, action.total)
472 try:
473 # Load the "since" value from the database
474 since_o = self._store.get(SinceValue, host=self.host, \
475 device_id=self.device_id, \
476 category=SinceValue.EPISODES)
478 # Use a default since object for the first-time case
479 if since_o is None:
480 since_o = SinceValue(self.host, self.device_id, SinceValue.EPISODES)
482 # Step 1: Download Episode actions
483 try:
484 changes = self._client.download_episode_actions(since_o.since)
486 received_actions = [convert_from_api(a) for a in changes.actions]
487 logger.debug('Received %d episode actions', len(received_actions))
488 self._store.save(received_actions)
490 # Save the "since" value for later use
491 self._store.update(since_o, since=changes.since)
492 except Exception, e:
493 logger.warn('Exception while polling for episodes.', exc_info=True)
495 # Step 2: Upload Episode actions
497 # Convert actions to the mygpoclient format for uploading
498 episode_actions = [convert_to_api(a) for a in actions]
500 # Upload the episode actions
501 self._client.upload_episode_actions(episode_actions)
503 # Actions have been uploaded to the server - remove them
504 self._store.remove(actions)
505 logger.debug('Episode actions have been uploaded to the server.')
506 return True
507 except Exception, e:
508 logger.error('Cannot upload episode actions: %s', str(e), exc_info=True)
509 return False
511 def synchronize_subscriptions(self, actions):
512 logger.debug('Starting subscription sync.')
513 try:
514 # Load the "since" value from the database
515 since_o = self._store.get(SinceValue, host=self.host, \
516 device_id=self.device_id, \
517 category=SinceValue.PODCASTS)
519 # Use a default since object for the first-time case
520 if since_o is None:
521 since_o = SinceValue(self.host, self.device_id, SinceValue.PODCASTS)
523 # Step 1: Pull updates from the server and notify the frontend
524 result = self._client.pull_subscriptions(self.device_id, since_o.since)
526 # Update the "since" value in the database
527 self._store.update(since_o, since=result.since)
529 # Store received actions for later retrieval (and in case we
530 # have outdated actions in the database, simply remove them)
531 for url in result.add:
532 logger.debug('Received add action: %s', url)
533 self._store.remove(ReceivedSubscribeAction.remove(url))
534 self._store.remove(ReceivedSubscribeAction.add(url))
535 self._store.save(ReceivedSubscribeAction.add(url))
536 for url in result.remove:
537 logger.debug('Received remove action: %s', url)
538 self._store.remove(ReceivedSubscribeAction.add(url))
539 self._store.remove(ReceivedSubscribeAction.remove(url))
540 self._store.save(ReceivedSubscribeAction.remove(url))
542 # Step 2: Push updates to the server and rewrite URLs (if any)
543 actions = self._store.load(SubscribeAction)
545 add = [a.url for a in actions if a.is_add]
546 remove = [a.url for a in actions if a.is_remove]
548 if add or remove:
549 logger.debug('Uploading: +%d / -%d', len(add), len(remove))
550 # Only do a push request if something has changed
551 result = self._client.update_subscriptions(self.device_id, add, remove)
553 # Update the "since" value in the database
554 self._store.update(since_o, since=result.since)
556 # Store URL rewrites for later retrieval by GUI
557 for old_url, new_url in result.update_urls:
558 if new_url:
559 logger.debug('Rewritten URL: %s', new_url)
560 self._store.save(RewrittenUrl(old_url, new_url))
562 # Actions have been uploaded to the server - remove them
563 self._store.remove(actions)
564 logger.debug('All actions have been uploaded to the server.')
565 return True
566 except Exception, e:
567 logger.error('Cannot upload subscriptions: %s', str(e), exc_info=True)
568 return False
570 def update_device(self, action):
571 try:
572 logger.debug('Uploading device settings...')
573 self._client.update_device_settings(action.device_id, \
574 action.caption, action.device_type)
575 logger.debug('Device settings uploaded.')
576 return True
577 except Exception, e:
578 logger.error('Cannot update device %s: %s', self.device_id,
579 str(e), exc_info=True)
580 return False
582 def get_devices(self):
583 result = []
584 for d in self._client.get_devices():
585 result.append((d.device_id, d.caption, d.type))
586 return result
588 def open_website(self):
589 util.open_website('http://' + self._config.mygpo.server)
592 class Directory(object):
593 def __init__(self):
594 self.client = public.PublicClient()
596 def toplist(self):
597 return [(p.title or p.url, p.url)
598 for p in self.client.get_toplist()
599 if p.url]
601 def search(self, query):
602 return [(p.title or p.url, p.url)
603 for p in self.client.search_podcasts(query)
604 if p.url]