Updated to use a pool or worker threads
authorJason Michalski <armooo@armooo.net>
Sun, 6 Apr 2008 08:38:44 +0000 (6 03:38 -0500)
committerJason Michalski <armooo@armooo.net>
Sun, 6 Apr 2008 08:38:44 +0000 (6 03:38 -0500)
plugins/webvideo/webvideo.py

index 0610210..ed28e9c 100644 (file)
@@ -6,6 +6,7 @@ import xmpp
 
 import threading
 import xml.etree.ElementTree as ElementTree
+import Queue
 
 CLASS_NAME = 'WebVideo'
 
@@ -15,10 +16,14 @@ class WebVideo(Video):
     CONTENT_TYPE = 'x-not-for/tivo'
 
     def init(self):
-        self.sem = threading.Semaphore(1)
+        self.work_queue = Queue.Queue()
+        self.download_thread_num = 1
+        self.in_progress = {}
+        self.in_progress_lock = threading.Lock()
 
         self.startXMPP()
         self.xmpp_cdsupdate()
+        self.startWorkerThreads()
 
     def startXMPP(self):
         m = mind.getMind()
@@ -40,11 +45,16 @@ class WebVideo(Video):
             jid=xmpp.protocol.JID(user_name)
             cl.sendPresence(jid)
 
-
         t = threading.Thread(target=self.processXMPP, args=(cl,))
         t.setDaemon(True)
         t.start()
 
+    def startWorkerThreads(self):
+        for i in range(self.download_thread_num):
+            t = threading.Thread(target=self.processDlRequest)
+            t.setDaemon(True)
+            t.start()
+
     def processXMPP(self, client):
         while client.Process():
             pass
@@ -59,56 +69,66 @@ class WebVideo(Video):
         method = getattr(self, method_name)
         method(xmpp_action)
 
-
     def xmpp_cdsupdate(self, xml=None):
         m = mind.getMind()
-        for request in m.getDownloadRequests():
-            t = threading.Thread(target=self.processDlRequest, args=(request,))
-            t.setDaemon(True)
-            t.start()
 
-    def processDlRequest(self, data):
+        self.in_progress_lock.acquire()
+        try:
+            for request in m.getDownloadRequests():
+                if not request['bodyOfferId'] in self.in_progress:
+                    self.in_progress[request['bodyOfferId']] = True
+                    self.work_queue.put(request)
+        finally:
+            self.in_progress_lock.release()
+
+    def processDlRequest(self):
         import shutil
         import os.path
         import urllib2
         import urllib
 
-        for share_name, settings in config.getShares():
-            if settings['type'] == 'webvideo':
-                break
+        while True:
+            data = self.work_queue.get()
 
-        self.sem.acquire()
+            for share_name, settings in config.getShares():
+                if settings['type'] == 'webvideo':
+                    break
 
-        path = settings['path']
-        file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1]))
 
-        print 'downloading %s to %s' % (data['url'], file_name)
+            path = settings['path']
+            file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1]))
 
-        outfile = open(file_name, 'wb')
+            print 'downloading %s to %s' % (data['url'], file_name)
 
-        infile = urllib2.urlopen(data['url'])
-        shutil.copyfileobj(infile, outfile)
+            outfile = open(file_name, 'wb')
 
-        print 'done downloading %s to %s' % (data['url'], file_name)
+            infile = urllib2.urlopen(data['url'])
+            shutil.copyfileobj(infile, outfile)
 
-        tsn = data['bodyId']
-        file_info = VideoDetails()
-        file_info.update(self.metadata_full(file_name, tsn))
+            print 'done downloading %s to %s' % (data['url'], file_name)
 
-        import socket
-        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        s.connect(('tivo.com',123))
-        ip = s.getsockname()[0]
-        port = config.getPort()
+            tsn = data['bodyId']
+            file_info = VideoDetails()
+            file_info.update(self.metadata_full(file_name, tsn))
 
-        data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1]))
-        data['duration'] = file_info['duration'] / 1000
-        data['size'] = file_info['size']
+            import socket
+            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            s.connect(('tivo.com',123))
+            ip = s.getsockname()[0]
+            port = config.getPort()
 
-        print data
+            data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1]))
+            data['duration'] = file_info['duration'] / 1000
+            data['size'] = file_info['size']
 
-        m = mind.getMind()
-        m.completeDownloadRequest(data)
+            print data
+
+            m = mind.getMind()
+            m.completeDownloadRequest(data)
 
-        self.sem.release()
+            self.in_progress_lock.acquire()
+            try:
+                del self.in_progress[data['bodyOfferId']]
+            finally:
+                self.in_progress_lock.release()