5 __copyright__
= "Copyright (c) 2002-2005 Free Software Foundation, Inc."
7 Straw is free software; you can redistribute it and/or modify it under the
8 terms of the GNU General Public License as published by the Free Software
9 Foundation; either version 2 of the License, or (at your option) any later
12 Straw is distributed in the hope that it will be useful, but WITHOUT ANY
13 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along with
17 this program; if not, write to the Free Software Foundation, Inc., 59 Temple
18 Place - Suite 330, Boston, MA 02111-1307, USA. """
20 from JobManager
import Job
, TaskResult
, TaskThread
, ThreadPoolJobHandler
21 from threading
import Semaphore
28 def _get_url_schema(url
):
29 return urlparse
.urlsplit(url
, "http")[0]
31 class Fetcher(object):
35 def supports(self
, **kwargs
):
36 raise NotImplementedError
38 class FetchTask(object):
39 def __init__(self
, fetcher
, url
, user_data
):
40 self
.fetcher
= fetcher
42 self
.user_data
= user_data
45 return "[fetch: url = (%s)]" % self
.url
48 return self
.fetcher
.fetch(self
)
50 class FetchResult(object):
51 def __init__(self
, content
, error
):
52 self
.content
= content
55 class HttpFetcher(Fetcher
):
57 Fetcher
.__init
__(self
)
59 def supports(self
, **kwargs
):
60 schema
= _get_url_schema(kwargs
["url"])
61 return schema
== "http" or schema
== "https"
63 def create_task(self
, **kwargs
):
64 return HttpFetchTask(self
, **kwargs
)
66 def fetch(self
, task
):
68 CACHE_DIR
= os
.path
.join(Config
.straw_home(), 'cache')
69 http
= httplib2
.Http(CACHE_DIR
)
75 username
, password
, domain
= task
.credentials
76 http
.add_credentials(username
, password
, domain
)
79 response
, content
= http
.request(url
, "GET")
83 if not error
and response
.status
>= 400:
86 return FetchResult(content
, error
)
88 class HttpFetchTask(FetchTask
):
89 def __init__(self
, fetcher
, **kwargs
):
90 FetchTask
.__init
__(self
, fetcher
, kwargs
["url"], kwargs
["user_data"])
92 self
.credentials
= None
94 if kwargs
.has_key("credentials"):
95 self
.credentials
= kwargs
["credentials"]
97 class FileFetchTask(FetchTask
):
98 def __init__(self
, fetcher
, **kwargs
):
99 FetchTask
.__init
__(self
, fetcher
, kwargs
["url"], kwargs
["user_data"])
101 class FileFetcher(Fetcher
):
103 Fetcher
.__init
__(self
)
105 def supports(self
, **kwargs
):
106 schema
= _get_url_schema(kwargs
["url"])
107 return schema
== "file"
109 def create_task(self
, **kwargs
):
110 return FileFetchTask(self
, **kwargs
)
112 def fetch(self
, task
):
118 f
= open(url
[len("file://"):], "r")
124 return FetchResult(content
, error
)
126 _file_fetcher
= FileFetcher()
127 _http_fetcher
= HttpFetcher()
129 class FetchJobHandler(ThreadPoolJobHandler
):
132 def __init__(self
, id, job
):
133 ThreadPoolJobHandler
.__init
__(self
, id, job
)
136 self
.task_class
= FetchTaskThread
139 for task
in self
.job
.tasks
:
140 self
.task_queue
.put(task
)
142 class FetchTaskThread(TaskThread
):
143 def __init__(self
, handler
):
144 TaskThread
.__init
__(self
, handler
)
146 def _process(self
, task
):
147 return task
.fetcher
.fetch(task
)
149 JobManager
.register_handler(FetchJobHandler
)
151 def create_task(**kwargs
):
152 for fetcher
in [ _http_fetcher
, _file_fetcher
]:
153 if fetcher
.supports(**kwargs
):
154 return fetcher
.create_task(**kwargs
)
156 def fetch(tasks
, observers
= {}, wait
= False):
159 job
.observers
= observers
160 return JobManager
.start(job
, wait
= wait
)