Evict resources from resource pool after timeout
[chromium-blink-merge.git] / net / proxy / multi_threaded_proxy_resolver.cc
blob6c6fa699534695e3fd802ccb03ff14f90fe43de0
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
7 #include <deque>
8 #include <vector>
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/location.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "base/thread_task_runner_handle.h"
18 #include "base/threading/non_thread_safe.h"
19 #include "base/threading/thread.h"
20 #include "base/threading/thread_restrictions.h"
21 #include "net/base/net_errors.h"
22 #include "net/log/net_log.h"
23 #include "net/proxy/proxy_info.h"
24 #include "net/proxy/proxy_resolver.h"
26 namespace net {
27 namespace {
28 class Job;
30 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
31 // thread and a synchronous ProxyResolver (which will be operated on said
32 // thread.)
33 class Executor : public base::RefCountedThreadSafe<Executor> {
34 public:
35 class Coordinator {
36 public:
37 virtual void OnExecutorReady(Executor* executor) = 0;
39 protected:
40 virtual ~Coordinator() = default;
43 // |coordinator| must remain valid throughout our lifetime. It is used to
44 // signal when the executor is ready to receive work by calling
45 // |coordinator->OnExecutorReady()|.
46 // |thread_number| is an identifier used when naming the worker thread.
47 Executor(Coordinator* coordinator, int thread_number);
49 // Submit a job to this executor.
50 void StartJob(Job* job);
52 // Callback for when a job has completed running on the executor's thread.
53 void OnJobCompleted(Job* job);
55 // Cleanup the executor. Cancels all outstanding work, and frees the thread
56 // and resolver.
57 void Destroy();
59 // Returns the outstanding job, or NULL.
60 Job* outstanding_job() const { return outstanding_job_.get(); }
62 ProxyResolver* resolver() { return resolver_.get(); }
64 int thread_number() const { return thread_number_; }
66 void set_resolver(scoped_ptr<ProxyResolver> resolver) {
67 resolver_ = resolver.Pass();
70 void set_coordinator(Coordinator* coordinator) {
71 DCHECK(coordinator);
72 DCHECK(coordinator_);
73 coordinator_ = coordinator;
76 private:
77 friend class base::RefCountedThreadSafe<Executor>;
78 ~Executor();
80 Coordinator* coordinator_;
81 const int thread_number_;
83 // The currently active job for this executor (either a CreateProxyResolver or
84 // GetProxyForURL task).
85 scoped_refptr<Job> outstanding_job_;
87 // The synchronous resolver implementation.
88 scoped_ptr<ProxyResolver> resolver_;
90 // The thread where |resolver_| is run on.
91 // Note that declaration ordering is important here. |thread_| needs to be
92 // destroyed *before* |resolver_|, in case |resolver_| is currently
93 // executing on |thread_|.
94 scoped_ptr<base::Thread> thread_;
97 class MultiThreadedProxyResolver : public ProxyResolver,
98 public Executor::Coordinator,
99 public base::NonThreadSafe {
100 public:
101 // Creates an asynchronous ProxyResolver that runs requests on up to
102 // |max_num_threads|.
104 // For each thread that is created, an accompanying synchronous ProxyResolver
105 // will be provisioned using |resolver_factory|. All methods on these
106 // ProxyResolvers will be called on the one thread.
107 MultiThreadedProxyResolver(
108 scoped_ptr<ProxyResolverFactory> resolver_factory,
109 size_t max_num_threads,
110 const scoped_refptr<ProxyResolverScriptData>& script_data,
111 scoped_refptr<Executor> executor);
113 ~MultiThreadedProxyResolver() override;
115 // ProxyResolver implementation:
116 int GetProxyForURL(const GURL& url,
117 ProxyInfo* results,
118 const CompletionCallback& callback,
119 RequestHandle* request,
120 const BoundNetLog& net_log) override;
121 void CancelRequest(RequestHandle request) override;
122 LoadState GetLoadState(RequestHandle request) const override;
124 private:
125 class GetProxyForURLJob;
126 // FIFO queue of pending jobs waiting to be started.
127 // TODO(eroman): Make this priority queue.
128 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue;
129 typedef std::vector<scoped_refptr<Executor>> ExecutorList;
131 // Returns an idle worker thread which is ready to receive GetProxyForURL()
132 // requests. If all threads are occupied, returns NULL.
133 Executor* FindIdleExecutor();
135 // Creates a new worker thread, and appends it to |executors_|.
136 void AddNewExecutor();
138 // Starts the next job from |pending_jobs_| if possible.
139 void OnExecutorReady(Executor* executor) override;
141 const scoped_ptr<ProxyResolverFactory> resolver_factory_;
142 const size_t max_num_threads_;
143 PendingJobsQueue pending_jobs_;
144 ExecutorList executors_;
145 scoped_refptr<ProxyResolverScriptData> script_data_;
148 // Job ---------------------------------------------
150 class Job : public base::RefCountedThreadSafe<Job> {
151 public:
152 // Identifies the subclass of Job (only being used for debugging purposes).
153 enum Type {
154 TYPE_GET_PROXY_FOR_URL,
155 TYPE_CREATE_RESOLVER,
158 Job(Type type, const CompletionCallback& callback)
159 : type_(type),
160 callback_(callback),
161 executor_(NULL),
162 was_cancelled_(false) {
165 void set_executor(Executor* executor) {
166 executor_ = executor;
169 // The "executor" is the job runner that is scheduling this job. If
170 // this job has not been submitted to an executor yet, this will be
171 // NULL (and we know it hasn't started yet).
172 Executor* executor() {
173 return executor_;
176 // Mark the job as having been cancelled.
177 void Cancel() {
178 was_cancelled_ = true;
181 // Returns true if Cancel() has been called.
182 bool was_cancelled() const { return was_cancelled_; }
184 Type type() const { return type_; }
186 // Returns true if this job still has a user callback. Some jobs
187 // do not have a user callback, because they were helper jobs
188 // scheduled internally (for example TYPE_CREATE_RESOLVER).
190 // Otherwise jobs that correspond with user-initiated work will
191 // have a non-null callback up until the callback is run.
192 bool has_user_callback() const { return !callback_.is_null(); }
194 // This method is called when the job is inserted into a wait queue
195 // because no executors were ready to accept it.
196 virtual void WaitingForThread() {}
198 // This method is called just before the job is posted to the work thread.
199 virtual void FinishedWaitingForThread() {}
201 // This method is called on the worker thread to do the job's work. On
202 // completion, implementors are expected to call OnJobCompleted() on
203 // |origin_runner|.
204 virtual void Run(
205 scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
207 protected:
208 void OnJobCompleted() {
209 // |executor_| will be NULL if the executor has already been deleted.
210 if (executor_)
211 executor_->OnJobCompleted(this);
214 void RunUserCallback(int result) {
215 DCHECK(has_user_callback());
216 CompletionCallback callback = callback_;
217 // Reset the callback so has_user_callback() will now return false.
218 callback_.Reset();
219 callback.Run(result);
222 friend class base::RefCountedThreadSafe<Job>;
224 virtual ~Job() {}
226 private:
227 const Type type_;
228 CompletionCallback callback_;
229 Executor* executor_;
230 bool was_cancelled_;
233 // CreateResolverJob -----------------------------------------------------------
235 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
236 class CreateResolverJob : public Job {
237 public:
238 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
239 ProxyResolverFactory* factory)
240 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()),
241 script_data_(script_data),
242 factory_(factory) {}
244 // Runs on the worker thread.
245 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
246 scoped_ptr<ProxyResolverFactory::Request> request;
247 int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
248 CompletionCallback(), &request);
250 DCHECK_NE(rv, ERR_IO_PENDING);
251 origin_runner->PostTask(
252 FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv));
255 protected:
256 ~CreateResolverJob() override {}
258 private:
259 // Runs the completion callback on the origin thread.
260 void RequestComplete(int result_code) {
261 // The task may have been cancelled after it was started.
262 if (!was_cancelled()) {
263 DCHECK(executor());
264 executor()->set_resolver(resolver_.Pass());
266 OnJobCompleted();
269 const scoped_refptr<ProxyResolverScriptData> script_data_;
270 ProxyResolverFactory* factory_;
271 scoped_ptr<ProxyResolver> resolver_;
274 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
276 class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
277 public:
278 // |url| -- the URL of the query.
279 // |results| -- the structure to fill with proxy resolve results.
280 GetProxyForURLJob(const GURL& url,
281 ProxyInfo* results,
282 const CompletionCallback& callback,
283 const BoundNetLog& net_log)
284 : Job(TYPE_GET_PROXY_FOR_URL, callback),
285 results_(results),
286 net_log_(net_log),
287 url_(url),
288 was_waiting_for_thread_(false) {
289 DCHECK(!callback.is_null());
292 BoundNetLog* net_log() { return &net_log_; }
294 void WaitingForThread() override {
295 was_waiting_for_thread_ = true;
296 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
299 void FinishedWaitingForThread() override {
300 DCHECK(executor());
302 if (was_waiting_for_thread_) {
303 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
306 net_log_.AddEvent(
307 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
308 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
311 // Runs on the worker thread.
312 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
313 ProxyResolver* resolver = executor()->resolver();
314 DCHECK(resolver);
315 int rv = resolver->GetProxyForURL(
316 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
317 DCHECK_NE(rv, ERR_IO_PENDING);
319 origin_runner->PostTask(
320 FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
323 protected:
324 ~GetProxyForURLJob() override {}
326 private:
327 // Runs the completion callback on the origin thread.
328 void QueryComplete(int result_code) {
329 // The Job may have been cancelled after it was started.
330 if (!was_cancelled()) {
331 if (result_code >= OK) { // Note: unit-tests use values > 0.
332 results_->Use(results_buf_);
334 RunUserCallback(result_code);
336 OnJobCompleted();
339 // Must only be used on the "origin" thread.
340 ProxyInfo* results_;
342 // Can be used on either "origin" or worker thread.
343 BoundNetLog net_log_;
344 const GURL url_;
346 // Usable from within DoQuery on the worker thread.
347 ProxyInfo results_buf_;
349 bool was_waiting_for_thread_;
352 // Executor ----------------------------------------
354 Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
355 : coordinator_(coordinator), thread_number_(thread_number) {
356 DCHECK(coordinator);
357 // Start up the thread.
358 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
359 thread_number)));
360 CHECK(thread_->Start());
363 void Executor::StartJob(Job* job) {
364 DCHECK(!outstanding_job_.get());
365 outstanding_job_ = job;
367 // Run the job. Once it has completed (regardless of whether it was
368 // cancelled), it will invoke OnJobCompleted() on this thread.
369 job->set_executor(this);
370 job->FinishedWaitingForThread();
371 thread_->task_runner()->PostTask(
372 FROM_HERE,
373 base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get()));
376 void Executor::OnJobCompleted(Job* job) {
377 DCHECK_EQ(job, outstanding_job_.get());
378 outstanding_job_ = NULL;
379 coordinator_->OnExecutorReady(this);
382 void Executor::Destroy() {
383 DCHECK(coordinator_);
386 // See http://crbug.com/69710.
387 base::ThreadRestrictions::ScopedAllowIO allow_io;
389 // Join the worker thread.
390 thread_.reset();
393 // Cancel any outstanding job.
394 if (outstanding_job_.get()) {
395 outstanding_job_->Cancel();
396 // Orphan the job (since this executor may be deleted soon).
397 outstanding_job_->set_executor(NULL);
400 // It is now safe to free the ProxyResolver, since all the tasks that
401 // were using it on the resolver thread have completed.
402 resolver_.reset();
404 // Null some stuff as a precaution.
405 coordinator_ = NULL;
406 outstanding_job_ = NULL;
409 Executor::~Executor() {
410 // The important cleanup happens as part of Destroy(), which should always be
411 // called first.
412 DCHECK(!coordinator_) << "Destroy() was not called";
413 DCHECK(!thread_.get());
414 DCHECK(!resolver_.get());
415 DCHECK(!outstanding_job_.get());
418 // MultiThreadedProxyResolver --------------------------------------------------
420 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
421 scoped_ptr<ProxyResolverFactory> resolver_factory,
422 size_t max_num_threads,
423 const scoped_refptr<ProxyResolverScriptData>& script_data,
424 scoped_refptr<Executor> executor)
425 : resolver_factory_(resolver_factory.Pass()),
426 max_num_threads_(max_num_threads),
427 script_data_(script_data) {
428 DCHECK(script_data_);
429 executor->set_coordinator(this);
430 executors_.push_back(executor);
433 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
434 DCHECK(CalledOnValidThread());
435 // We will cancel all outstanding requests.
436 pending_jobs_.clear();
438 for (auto& executor : executors_) {
439 executor->Destroy();
443 int MultiThreadedProxyResolver::GetProxyForURL(
444 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
445 RequestHandle* request, const BoundNetLog& net_log) {
446 DCHECK(CalledOnValidThread());
447 DCHECK(!callback.is_null());
449 scoped_refptr<GetProxyForURLJob> job(
450 new GetProxyForURLJob(url, results, callback, net_log));
452 // Completion will be notified through |callback|, unless the caller cancels
453 // the request using |request|.
454 if (request)
455 *request = reinterpret_cast<RequestHandle>(job.get());
457 // If there is an executor that is ready to run this request, submit it!
458 Executor* executor = FindIdleExecutor();
459 if (executor) {
460 DCHECK_EQ(0u, pending_jobs_.size());
461 executor->StartJob(job.get());
462 return ERR_IO_PENDING;
465 // Otherwise queue this request. (We will schedule it to a thread once one
466 // becomes available).
467 job->WaitingForThread();
468 pending_jobs_.push_back(job);
470 // If we haven't already reached the thread limit, provision a new thread to
471 // drain the requests more quickly.
472 if (executors_.size() < max_num_threads_)
473 AddNewExecutor();
475 return ERR_IO_PENDING;
478 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
479 DCHECK(CalledOnValidThread());
480 DCHECK(req);
482 Job* job = reinterpret_cast<Job*>(req);
483 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
485 if (job->executor()) {
486 // If the job was already submitted to the executor, just mark it
487 // as cancelled so the user callback isn't run on completion.
488 job->Cancel();
489 } else {
490 // Otherwise the job is just sitting in a queue.
491 PendingJobsQueue::iterator it =
492 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
493 DCHECK(it != pending_jobs_.end());
494 pending_jobs_.erase(it);
498 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
499 DCHECK(CalledOnValidThread());
500 DCHECK(req);
501 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
504 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
505 DCHECK(CalledOnValidThread());
506 for (ExecutorList::iterator it = executors_.begin();
507 it != executors_.end(); ++it) {
508 Executor* executor = it->get();
509 if (!executor->outstanding_job())
510 return executor;
512 return NULL;
515 void MultiThreadedProxyResolver::AddNewExecutor() {
516 DCHECK(CalledOnValidThread());
517 DCHECK_LT(executors_.size(), max_num_threads_);
518 // The "thread number" is used to give the thread a unique name.
519 int thread_number = executors_.size();
520 Executor* executor = new Executor(this, thread_number);
521 executor->StartJob(
522 new CreateResolverJob(script_data_, resolver_factory_.get()));
523 executors_.push_back(make_scoped_refptr(executor));
526 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
527 DCHECK(CalledOnValidThread());
528 if (pending_jobs_.empty())
529 return;
531 // Get the next job to process (FIFO). Transfer it from the pending queue
532 // to the executor.
533 scoped_refptr<Job> job = pending_jobs_.front();
534 pending_jobs_.pop_front();
535 executor->StartJob(job.get());
538 } // namespace
540 class MultiThreadedProxyResolverFactory::Job
541 : public ProxyResolverFactory::Request,
542 public Executor::Coordinator {
543 public:
544 Job(MultiThreadedProxyResolverFactory* factory,
545 const scoped_refptr<ProxyResolverScriptData>& script_data,
546 scoped_ptr<ProxyResolver>* resolver,
547 scoped_ptr<ProxyResolverFactory> resolver_factory,
548 size_t max_num_threads,
549 const CompletionCallback& callback)
550 : factory_(factory),
551 resolver_out_(resolver),
552 resolver_factory_(resolver_factory.Pass()),
553 max_num_threads_(max_num_threads),
554 script_data_(script_data),
555 executor_(new Executor(this, 0)),
556 callback_(callback) {
557 executor_->StartJob(
558 new CreateResolverJob(script_data_, resolver_factory_.get()));
561 ~Job() override {
562 if (factory_) {
563 executor_->Destroy();
564 factory_->RemoveJob(this);
568 void FactoryDestroyed() {
569 executor_->Destroy();
570 executor_ = nullptr;
571 factory_ = nullptr;
574 private:
575 void OnExecutorReady(Executor* executor) override {
576 int error = OK;
577 if (executor->resolver()) {
578 resolver_out_->reset(new MultiThreadedProxyResolver(
579 resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(),
580 executor_));
581 } else {
582 error = ERR_PAC_SCRIPT_FAILED;
583 executor_->Destroy();
585 factory_->RemoveJob(this);
586 factory_ = nullptr;
587 callback_.Run(error);
590 MultiThreadedProxyResolverFactory* factory_;
591 scoped_ptr<ProxyResolver>* const resolver_out_;
592 scoped_ptr<ProxyResolverFactory> resolver_factory_;
593 const size_t max_num_threads_;
594 scoped_refptr<ProxyResolverScriptData> script_data_;
595 scoped_refptr<Executor> executor_;
596 const CompletionCallback callback_;
599 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
600 size_t max_num_threads,
601 bool factory_expects_bytes)
602 : ProxyResolverFactory(factory_expects_bytes),
603 max_num_threads_(max_num_threads) {
604 DCHECK_GE(max_num_threads, 1u);
607 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
608 for (auto job : jobs_) {
609 job->FactoryDestroyed();
613 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
614 const scoped_refptr<ProxyResolverScriptData>& pac_script,
615 scoped_ptr<ProxyResolver>* resolver,
616 const CompletionCallback& callback,
617 scoped_ptr<Request>* request) {
618 scoped_ptr<Job> job(new Job(this, pac_script, resolver,
619 CreateProxyResolverFactory(), max_num_threads_,
620 callback));
621 jobs_.insert(job.get());
622 *request = job.Pass();
623 return ERR_IO_PENDING;
626 void MultiThreadedProxyResolverFactory::RemoveJob(
627 MultiThreadedProxyResolverFactory::Job* job) {
628 size_t erased = jobs_.erase(job);
629 DCHECK_EQ(1u, erased);
632 } // namespace net