From: Jason Michalski Date: Sun, 6 Apr 2008 08:38:44 +0000 (-0500) Subject: Updated to use a pool or worker threads X-Git-Url: https://repo.or.cz/w/pyTivo.git/commitdiff_plain/e19fdd68c9407b567c1608c8b3a5a9396ab27552 Updated to use a pool or worker threads --- diff --git a/plugins/webvideo/webvideo.py b/plugins/webvideo/webvideo.py index 0610210..ed28e9c 100644 --- a/plugins/webvideo/webvideo.py +++ b/plugins/webvideo/webvideo.py @@ -6,6 +6,7 @@ import xmpp import threading import xml.etree.ElementTree as ElementTree +import Queue CLASS_NAME = 'WebVideo' @@ -15,10 +16,14 @@ class WebVideo(Video): CONTENT_TYPE = 'x-not-for/tivo' def init(self): - self.sem = threading.Semaphore(1) + self.work_queue = Queue.Queue() + self.download_thread_num = 1 + self.in_progress = {} + self.in_progress_lock = threading.Lock() self.startXMPP() self.xmpp_cdsupdate() + self.startWorkerThreads() def startXMPP(self): m = mind.getMind() @@ -40,11 +45,16 @@ class WebVideo(Video): jid=xmpp.protocol.JID(user_name) cl.sendPresence(jid) - t = threading.Thread(target=self.processXMPP, args=(cl,)) t.setDaemon(True) t.start() + def startWorkerThreads(self): + for i in range(self.download_thread_num): + t = threading.Thread(target=self.processDlRequest) + t.setDaemon(True) + t.start() + def processXMPP(self, client): while client.Process(): pass @@ -59,56 +69,66 @@ class WebVideo(Video): method = getattr(self, method_name) method(xmpp_action) - def xmpp_cdsupdate(self, xml=None): m = mind.getMind() - for request in m.getDownloadRequests(): - t = threading.Thread(target=self.processDlRequest, args=(request,)) - t.setDaemon(True) - t.start() - def processDlRequest(self, data): + self.in_progress_lock.acquire() + try: + for request in m.getDownloadRequests(): + if not request['bodyOfferId'] in self.in_progress: + self.in_progress[request['bodyOfferId']] = True + self.work_queue.put(request) + finally: + self.in_progress_lock.release() + + def processDlRequest(self): import shutil import os.path import urllib2 import urllib - for share_name, settings in config.getShares(): - if settings['type'] == 'webvideo': - break + while True: + data = self.work_queue.get() - self.sem.acquire() + for share_name, settings in config.getShares(): + if settings['type'] == 'webvideo': + break - path = settings['path'] - file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1])) - print 'downloading %s to %s' % (data['url'], file_name) + path = settings['path'] + file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1])) - outfile = open(file_name, 'wb') + print 'downloading %s to %s' % (data['url'], file_name) - infile = urllib2.urlopen(data['url']) - shutil.copyfileobj(infile, outfile) + outfile = open(file_name, 'wb') - print 'done downloading %s to %s' % (data['url'], file_name) + infile = urllib2.urlopen(data['url']) + shutil.copyfileobj(infile, outfile) - tsn = data['bodyId'] - file_info = VideoDetails() - file_info.update(self.metadata_full(file_name, tsn)) + print 'done downloading %s to %s' % (data['url'], file_name) - import socket - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('tivo.com',123)) - ip = s.getsockname()[0] - port = config.getPort() + tsn = data['bodyId'] + file_info = VideoDetails() + file_info.update(self.metadata_full(file_name, tsn)) - data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1])) - data['duration'] = file_info['duration'] / 1000 - data['size'] = file_info['size'] + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('tivo.com',123)) + ip = s.getsockname()[0] + port = config.getPort() - print data + data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1])) + data['duration'] = file_info['duration'] / 1000 + data['size'] = file_info['size'] - m = mind.getMind() - m.completeDownloadRequest(data) + print data + + m = mind.getMind() + m.completeDownloadRequest(data) - self.sem.release() + self.in_progress_lock.acquire() + try: + del self.in_progress[data['bodyOfferId']] + finally: + self.in_progress_lock.release()