1 # -*- coding: utf-8 -*-
3 # gPodder - A media aggregator and podcast client
4 # Copyright (c) 2005-2010 Thomas Perl and the gPodder Team
6 # gPodder is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # gPodder is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # download.py -- Download queue management
23 # Thomas Perl <thp@perli.net> 2007-09-15
25 # Based on libwget.py (2005-10-29)
28 from __future__
import with_statement
30 from gpodder
.liblogger
import log
31 from gpodder
import util
32 from gpodder
import youtube
48 from xml
.sax
import saxutils
52 def get_header_param(headers
, param
, header_name
):
53 """Extract a HTTP header parameter from a dict
55 Uses the "email" module to retrieve parameters
56 from HTTP headers. This can be used to get the
57 "filename" parameter of the "content-disposition"
58 header for downloads to pick a good filename.
60 Returns None if the filename cannot be retrieved.
63 headers_string
= ['%s:%s'%(k
,v
) for k
,v
in headers
.items()]
64 msg
= email
.message_from_string('\n'.join(headers_string
))
65 if header_name
in msg
:
66 value
= msg
.get_param(param
, header
=header_name
)
69 decoded_list
= email
.Header
.decode_header(value
)
71 for part
, encoding
in decoded_list
:
73 value
.append(part
.decode(encoding
))
75 value
.append(unicode(part
))
76 return u
''.join(value
)
78 log('Error trying to get %s from %s: %s', \
79 param
, header_name
, str(e
), traceback
=True)
83 class ContentRange(object):
85 # http://svn.pythonpaste.org/Paste/WebOb/trunk/webob/byterange.py
87 # Copyright (c) 2007 Ian Bicking and Contributors
89 # Permission is hereby granted, free of charge, to any person obtaining
90 # a copy of this software and associated documentation files (the
91 # "Software"), to deal in the Software without restriction, including
92 # without limitation the rights to use, copy, modify, merge, publish,
93 # distribute, sublicense, and/or sell copies of the Software, and to
94 # permit persons to whom the Software is furnished to do so, subject to
95 # the following conditions:
97 # The above copyright notice and this permission notice shall be
98 # included in all copies or substantial portions of the Software.
100 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
101 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
102 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
103 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
104 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
105 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
106 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
108 Represents the Content-Range header
110 This header is ``start-stop/length``, where stop and length can be
111 ``*`` (represented as None in the attributes).
114 def __init__(self
, start
, stop
, length
):
115 assert start
>= 0, "Bad start: %r" % start
116 assert stop
is None or (stop
>= 0 and stop
>= start
), (
117 "Bad stop: %r" % stop
)
124 self
.__class
__.__name
__,
128 if self
.stop
is None:
132 if self
.length
is None:
136 return 'bytes %s-%s/%s' % (self
.start
, stop
, length
)
140 Mostly so you can unpack this, like:
142 start, stop, length = res.content_range
144 return iter([self
.start
, self
.stop
, self
.length
])
147 def parse(cls
, value
):
149 Parse the header. May return None if it cannot parse.
153 value
= value
.strip()
154 if not value
.startswith('bytes '):
157 value
= value
[len('bytes '):].strip()
159 # Invalid, no length given
161 range, length
= value
.split('/', 1)
165 start
, end
= range.split('-', 1)
180 return cls(start
, None, length
)
182 return cls(start
, end
-1, length
)
185 class DownloadCancelledException(Exception): pass
186 class AuthenticationError(Exception): pass
188 class gPodderDownloadHTTPError(Exception):
189 def __init__(self
, url
, error_code
, error_message
):
191 self
.error_code
= error_code
192 self
.error_message
= error_message
194 class DownloadURLOpener(urllib
.FancyURLopener
):
195 version
= gpodder
.user_agent
197 # Sometimes URLs are not escaped correctly - try to fix them
198 # (see RFC2396; Section 2.4.3. Excluded US-ASCII Characters)
199 # FYI: The omission of "%" in the list is to avoid double escaping!
200 ESCAPE_CHARS
= dict((ord(c
), u
'%%%x'%ord(c
)) for c
in u
' <>#"{}|\\^[]`')
202 def __init__( self
, channel
):
203 self
.channel
= channel
204 self
._auth
_retry
_counter
= 0
205 urllib
.FancyURLopener
.__init
__(self
, None)
207 def http_error_default(self
, url
, fp
, errcode
, errmsg
, headers
):
209 FancyURLopener by default does not raise an exception when
210 there is some unknown HTTP error code. We want to override
211 this and provide a function to log the error and raise an
212 exception, so we don't download the HTTP error page here.
214 # The following two lines are copied from urllib.URLopener's
215 # implementation of http_error_default
218 raise gPodderDownloadHTTPError(url
, errcode
, errmsg
)
220 def redirect_internal(self
, url
, fp
, errcode
, errmsg
, headers
, data
):
221 """ This is the exact same function that's included with urllib
222 except with "void = fp.read()" commented out. """
224 if 'location' in headers
:
225 newurl
= headers
['location']
226 elif 'uri' in headers
:
227 newurl
= headers
['uri']
231 # This blocks forever(?) with certain servers (see bug #465)
235 # In case the server sent a relative URL, join with original:
236 newurl
= urlparse
.urljoin(self
.type + ":" + url
, newurl
)
237 return self
.open(newurl
)
239 # The following is based on Python's urllib.py "URLopener.retrieve"
240 # Also based on http://mail.python.org/pipermail/python-list/2001-October/110069.html
242 def http_error_206(self
, url
, fp
, errcode
, errmsg
, headers
, data
=None):
243 # The next line is taken from urllib's URLopener.open_http
244 # method, at the end after the line "if errcode == 200:"
245 return urllib
.addinfourl(fp
, headers
, 'http:' + url
)
247 def retrieve_resume(self
, url
, filename
, reporthook
=None, data
=None):
248 """Download files from an URL; return (headers, real_url)
250 Resumes a download if the local filename exists and
251 the server supports download resuming.
256 if os
.path
.exists(filename
):
258 current_size
= os
.path
.getsize(filename
)
259 tfp
= open(filename
, 'ab')
260 #If the file exists, then only download the remainder
262 self
.addheader('Range', 'bytes=%s-' % (current_size
))
264 log('Cannot open file for resuming: %s', filename
, sender
=self
, traceback
=True)
269 tfp
= open(filename
, 'wb')
271 # Fix a problem with bad URLs that are not encoded correctly (bug 549)
272 url
= url
.decode('ascii', 'ignore')
273 url
= url
.translate(self
.ESCAPE_CHARS
)
274 url
= url
.encode('ascii')
276 url
= urllib
.unwrap(urllib
.toBytes(url
))
277 fp
= self
.open(url
, data
)
281 # We told the server to resume - see if she agrees
282 # See RFC2616 (206 Partial Content + Section 14.16)
283 # XXX check status code here, too...
284 range = ContentRange
.parse(headers
.get('content-range', ''))
285 if range is None or range.start
!= current_size
:
286 # Ok, that did not work. Reset the download
287 # TODO: seek and truncate if content-range differs from request
289 tfp
= open(filename
, 'wb')
291 log('Cannot resume. Missing or wrong Content-Range header (RFC2616)', sender
=self
)
293 result
= headers
, fp
.geturl()
297 blocknum
= int(current_size
/bs
)
299 if "content-length" in headers
:
300 size
= int(headers
["Content-Length"]) + current_size
301 reporthook(blocknum
, bs
, size
)
302 while read
< size
or size
== -1:
306 block
= fp
.read(min(size
-read
, bs
))
313 reporthook(blocknum
, bs
, size
)
319 # raise exception if actual size does not match content-length header
320 if size
>= 0 and read
< size
:
321 raise urllib
.ContentTooShortError("retrieval incomplete: got only %i out "
322 "of %i bytes" % (read
, size
), result
)
326 # end code based on urllib.py
328 def prompt_user_passwd( self
, host
, realm
):
329 # Keep track of authentication attempts, fail after the third one
330 self
._auth
_retry
_counter
+= 1
331 if self
._auth
_retry
_counter
> 3:
332 raise AuthenticationError(_('Wrong username/password'))
334 if self
.channel
.username
or self
.channel
.password
:
335 log( 'Authenticating as "%s" to "%s" for realm "%s".', self
.channel
.username
, host
, realm
, sender
= self
)
336 return ( self
.channel
.username
, self
.channel
.password
)
341 class DownloadQueueWorker(threading
.Thread
):
342 def __init__(self
, queue
, exit_callback
, continue_check_callback
):
343 threading
.Thread
.__init
__(self
)
345 self
.exit_callback
= exit_callback
346 self
.continue_check_callback
= continue_check_callback
349 log('Running new thread: %s', self
.getName(), sender
=self
)
351 # Check if this thread is allowed to continue accepting tasks
352 if not self
.continue_check_callback(self
):
353 log('%s must not accept new tasks.', self
.getName(), sender
=self
)
357 task
= self
.queue
.pop()
358 log('%s is processing: %s', self
.getName(), task
, sender
=self
)
360 except IndexError, e
:
361 log('No more tasks for %s to carry out.', self
.getName(), sender
=self
)
363 self
.exit_callback(self
)
366 class DownloadQueueManager(object):
367 def __init__(self
, config
):
368 self
._config
= config
369 self
.tasks
= collections
.deque()
371 self
.worker_threads_access
= threading
.RLock()
372 self
.worker_threads
= []
374 def __exit_callback(self
, worker_thread
):
375 with self
.worker_threads_access
:
376 self
.worker_threads
.remove(worker_thread
)
378 def __continue_check_callback(self
, worker_thread
):
379 with self
.worker_threads_access
:
380 if len(self
.worker_threads
) > self
._config
.max_downloads
and \
381 self
._config
.max_downloads_enabled
:
382 self
.worker_threads
.remove(worker_thread
)
387 def spawn_threads(self
):
388 with self
.worker_threads_access
:
389 if not len(self
.tasks
):
392 if len(self
.worker_threads
) == 0 or \
393 len(self
.worker_threads
) < self
._config
.max_downloads
or \
394 not self
._config
.max_downloads_enabled
:
395 # We have to create a new thread here, there's work to do
396 log('I am going to spawn a new worker thread.', sender
=self
)
397 worker
= DownloadQueueWorker(self
.tasks
, self
.__exit
_callback
, \
398 self
.__continue
_check
_callback
)
399 self
.worker_threads
.append(worker
)
402 def are_queued_or_active_tasks(self
):
403 with self
.worker_threads_access
:
404 return len(self
.worker_threads
) > 0
406 def add_task(self
, task
):
407 if task
.status
!= DownloadTask
.INIT
:
408 # This task is old so update episode from db
409 task
.episode
.reload_from_db()
410 task
.status
= DownloadTask
.QUEUED
411 self
.tasks
.appendleft(task
)
415 class DownloadTask(object):
416 """An object representing the download task of an episode
418 You can create a new download task like this:
420 task = DownloadTask(episode, gpodder.config.Config(CONFIGFILE))
421 task.status = DownloadTask.QUEUED
424 While the download is in progress, you can access its properties:
426 task.total_size # in bytes
427 task.progress # from 0.0 to 1.0
428 task.speed # in bytes per second
429 str(task) # name of the episode
430 task.status # current status
431 task.status_changed # True if the status has been changed (see below)
432 task.url # URL of the episode being downloaded
433 task.podcast_url # URL of the podcast this download belongs to
435 You can cancel a running download task by setting its status:
437 task.status = DownloadTask.CANCELLED
439 The task will then abort as soon as possible (due to the nature
440 of downloading data, this can take a while when the Internet is
443 The "status_changed" attribute gets set to True everytime the
444 "status" attribute changes its value. After you get the value of
445 the "status_changed" attribute, it is always reset to False:
447 if task.status_changed:
448 new_status = task.status
449 # .. update the UI accordingly ..
451 Obviously, this also means that you must have at most *one*
452 place in your UI code where you check for status changes and
453 broadcast the status updates from there.
455 While the download is taking place and after the .run() method
456 has finished, you can get the final status to check if the download
459 if task.status == DownloadTask.DONE:
460 # .. everything ok ..
461 elif task.status == DownloadTask.FAILED:
462 # .. an error happened, and the
463 # error_message attribute is set ..
464 print task.error_message
465 elif task.status == DownloadTask.PAUSED:
466 # .. user paused the download ..
467 elif task.status == DownloadTask.CANCELLED:
468 # .. user cancelled the download ..
470 The difference between cancelling and pausing a DownloadTask is
471 that the temporary file gets deleted when cancelling, but does
472 not get deleted when pausing.
474 Be sure to call .removed_from_list() on this task when removing
475 it from the UI, so that it can carry out any pending clean-up
476 actions (e.g. removing the temporary file when the task has not
477 finished successfully; i.e. task.status != DownloadTask.DONE).
479 # Possible states this download task can be in
480 STATUS_MESSAGE
= (_('Added'), _('Queued'), _('Downloading'),
481 _('Finished'), _('Failed'), _('Cancelled'), _('Paused'))
482 (INIT
, QUEUED
, DOWNLOADING
, DONE
, FAILED
, CANCELLED
, PAUSED
) = range(7)
485 return self
.__episode
.title
487 def __get_status(self
):
490 def __set_status(self
, status
):
491 if status
!= self
.__status
:
492 self
.__status
_changed
= True
493 self
.__status
= status
495 status
= property(fget
=__get_status
, fset
=__set_status
)
497 def __get_status_changed(self
):
498 if self
.__status
_changed
:
499 self
.__status
_changed
= False
504 status_changed
= property(fget
=__get_status_changed
)
507 return self
.__episode
.url
509 url
= property(fget
=__get_url
)
511 def __get_podcast_url(self
):
512 return self
.__episode
.channel
.url
514 podcast_url
= property(fget
=__get_podcast_url
)
516 def __get_episode(self
):
517 return self
.__episode
519 episode
= property(fget
=__get_episode
)
521 def removed_from_list(self
):
522 if self
.status
!= self
.DONE
:
523 util
.delete_file(self
.tempname
)
525 def __init__(self
, episode
, config
):
526 self
.__status
= DownloadTask
.INIT
527 self
.__status
_changed
= True
528 self
.__episode
= episode
529 self
._config
= config
531 # Set names for the downloads list
532 self
.markup_name
= saxutils
.escape(self
.__episode
.title
)
533 self
.markup_podcast_name
= saxutils
.escape(self
.__episode
.channel
.title
)
535 # Create the target filename and save it in the database
536 self
.filename
= self
.__episode
.local_filename(create
=True)
537 self
.tempname
= self
.filename
+ '.partial'
539 self
.total_size
= self
.__episode
.length
542 self
.error_message
= None
544 # Variables for speed limit and speed calculation
545 self
.__start
_time
= 0
546 self
.__start
_blocks
= 0
547 self
.__limit
_rate
_value
= self
._config
.limit_rate_value
548 self
.__limit
_rate
= self
._config
.limit_rate
550 # If the tempname already exists, set progress accordingly
551 if os
.path
.exists(self
.tempname
):
553 already_downloaded
= os
.path
.getsize(self
.tempname
)
554 if self
.total_size
> 0:
555 self
.progress
= max(0.0, min(1.0, float(already_downloaded
)/self
.total_size
))
556 except OSError, os_error
:
557 log('Error while getting size for existing file: %s', os_error
, sender
=self
)
559 # "touch self.tempname", so we also get partial
560 # files for resuming when the file is queued
561 open(self
.tempname
, 'w').close()
563 def status_updated(self
, count
, blockSize
, totalSize
):
564 # We see a different "total size" while downloading,
565 # so correct the total size variable in the thread
566 if totalSize
!= self
.total_size
and totalSize
> 0:
567 self
.total_size
= float(totalSize
)
569 if self
.total_size
> 0:
570 self
.progress
= max(0.0, min(1.0, float(count
*blockSize
)/self
.total_size
))
572 self
.calculate_speed(count
, blockSize
)
574 if self
.status
== DownloadTask
.CANCELLED
:
575 raise DownloadCancelledException()
577 if self
.status
== DownloadTask
.PAUSED
:
578 raise DownloadCancelledException()
580 def calculate_speed(self
, count
, blockSize
):
583 if self
.__start
_time
> 0:
584 # Has rate limiting been enabled or disabled?
585 if self
.__limit
_rate
!= self
._config
.limit_rate
:
586 # If it has been enabled then reset base time and block count
587 if self
._config
.limit_rate
:
588 self
.__start
_time
= now
589 self
.__start
_blocks
= count
590 self
.__limit
_rate
= self
._config
.limit_rate
592 # Has the rate been changed and are we currently limiting?
593 if self
.__limit
_rate
_value
!= self
._config
.limit_rate_value
and self
.__limit
_rate
:
594 self
.__start
_time
= now
595 self
.__start
_blocks
= count
596 self
.__limit
_rate
_value
= self
._config
.limit_rate_value
598 passed
= now
- self
.__start
_time
600 speed
= ((count
-self
.__start
_blocks
)*blockSize
)/passed
604 self
.__start
_time
= now
605 self
.__start
_blocks
= count
606 passed
= now
- self
.__start
_time
607 speed
= count
*blockSize
609 self
.speed
= float(speed
)
611 if self
._config
.limit_rate
and speed
> self
._config
.limit_rate_value
:
612 # calculate the time that should have passed to reach
613 # the desired download rate and wait if necessary
614 should_have_passed
= float((count
-self
.__start
_blocks
)*blockSize
)/(self
._config
.limit_rate_value
*1024.0)
615 if should_have_passed
> passed
:
616 # sleep a maximum of 10 seconds to not cause time-outs
617 delay
= min(10.0, float(should_have_passed
-passed
))
621 # Speed calculation (re-)starts here
622 self
.__start
_time
= 0
623 self
.__start
_blocks
= 0
625 # If the download has already been cancelled, skip it
626 if self
.status
== DownloadTask
.CANCELLED
:
627 util
.delete_file(self
.tempname
)
632 # We only start this download if its status is "queued"
633 if self
.status
!= DownloadTask
.QUEUED
:
636 # We are downloading this file right now
637 self
.status
= DownloadTask
.DOWNLOADING
640 # Resolve URL and start downloading the episode
641 url
= youtube
.get_real_download_url(self
.__episode
.url
, \
642 self
._config
.youtube_preferred_fmt_id
)
643 downloader
= DownloadURLOpener(self
.__episode
.channel
)
644 headers
, real_url
= downloader
.retrieve_resume(url
, \
645 self
.tempname
, reporthook
=self
.status_updated
)
647 new_mimetype
= headers
.get('content-type', self
.__episode
.mimetype
)
648 old_mimetype
= self
.__episode
.mimetype
649 if new_mimetype
!= old_mimetype
:
650 log('Correcting mime type: %s => %s', old_mimetype
, new_mimetype
, sender
=self
)
651 old_extension
= self
.__episode
.extension()
652 self
.__episode
.mimetype
= new_mimetype
653 new_extension
= self
.__episode
.extension()
655 # If the desired filename extension changed due to the new mimetype,
656 # we force an update of the local filename to fix the extension
657 if old_extension
!= new_extension
:
658 self
.filename
= self
.__episode
.local_filename(create
=True, force_update
=True)
660 # TODO: Check if "real_url" is different from "url" and if it is,
661 # see if we can get a better episode filename out of it
663 # Look at the Content-disposition header; use if if available
664 disposition_filename
= get_header_param(headers
, \
665 'filename', 'content-disposition')
667 if disposition_filename
is not None:
668 # The server specifies a download filename - try to use it
669 disposition_filename
= os
.path
.basename(disposition_filename
)
670 self
.filename
= self
.__episode
.local_filename(create
=True, \
671 force_update
=True, template
=disposition_filename
)
672 new_mimetype
, encoding
= mimetypes
.guess_type(self
.filename
)
673 if new_mimetype
is not None:
674 log('Using content-disposition mimetype: %s',
675 new_mimetype
, sender
=self
)
676 self
.__episode
.set_mimetype(new_mimetype
, commit
=True)
678 shutil
.move(self
.tempname
, self
.filename
)
680 # Get the _real_ filesize once we actually have the file
681 self
.__episode
.length
= os
.path
.getsize(self
.filename
)
682 self
.__episode
.channel
.addDownloadedItem(self
.__episode
)
684 # If a user command has been defined, execute the command setting some environment variables
685 if len(self
._config
.cmd_download_complete
) > 0:
686 os
.environ
["GPODDER_EPISODE_URL"]=self
.__episode
.url
or ''
687 os
.environ
["GPODDER_EPISODE_TITLE"]=self
.__episode
.title
or ''
688 os
.environ
["GPODDER_EPISODE_FILENAME"]=self
.filename
or ''
689 os
.environ
["GPODDER_EPISODE_PUBDATE"]=str(int(self
.__episode
.pubDate
))
690 os
.environ
["GPODDER_EPISODE_LINK"]=self
.__episode
.link
or ''
691 os
.environ
["GPODDER_EPISODE_DESC"]=self
.__episode
.description
or ''
692 os
.environ
["GPODDER_CHANNEL_TITLE"]=self
.__episode
.channel
.title
or ''
693 util
.run_external_command(self
._config
.cmd_download_complete
)
694 except DownloadCancelledException
:
695 log('Download has been cancelled/paused: %s', self
, sender
=self
)
696 if self
.status
== DownloadTask
.CANCELLED
:
697 util
.delete_file(self
.tempname
)
700 except urllib
.ContentTooShortError
, ctse
:
701 self
.status
= DownloadTask
.FAILED
702 self
.error_message
= _('Missing content from server')
704 log( 'Error "%s" while downloading "%s": %s', ioe
.strerror
, self
.__episode
.title
, ioe
.filename
, sender
=self
, traceback
=True)
705 self
.status
= DownloadTask
.FAILED
706 d
= {'error': ioe
.strerror
, 'filename': ioe
.filename
}
707 self
.error_message
= _('I/O Error: %(error)s: %(filename)s') % d
708 except gPodderDownloadHTTPError
, gdhe
:
709 log( 'HTTP error %s while downloading "%s": %s', gdhe
.error_code
, self
.__episode
.title
, gdhe
.error_message
, sender
=self
)
710 self
.status
= DownloadTask
.FAILED
711 d
= {'code': gdhe
.error_code
, 'message': gdhe
.error_message
}
712 self
.error_message
= _('HTTP Error %(code)s: %(message)s') % d
714 self
.status
= DownloadTask
.FAILED
715 self
.error_message
= _('Error: %s') % (e
.message
,)
717 if self
.status
== DownloadTask
.DOWNLOADING
:
718 # Everything went well - we're done
719 self
.status
= DownloadTask
.DONE
720 if self
.total_size
<= 0:
721 self
.total_size
= util
.calculate_size(self
.filename
)
722 log('Total size updated to %d', self
.total_size
, sender
=self
)
728 # We finished, but not successfully (at least not really)