1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
8 #include "base/location.h"
9 #include "base/memory/scoped_ptr.h"
10 #include "base/sequenced_task_runner.h"
11 #include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12 #include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13 #include "chrome/browser/sync_file_system/sync_file_metadata.h"
15 using storage::FileSystemURL
;
17 namespace sync_file_system
{
18 namespace drive_backend
{
22 class SyncTaskAdapter
: public ExclusiveTask
{
24 explicit SyncTaskAdapter(const SyncTaskManager::Task
& task
) : task_(task
) {}
25 virtual ~SyncTaskAdapter() {}
27 virtual void RunExclusive(const SyncStatusCallback
& callback
) OVERRIDE
{
32 SyncTaskManager::Task task_
;
34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter
);
39 SyncTaskManager::PendingTask::PendingTask() {}
41 SyncTaskManager::PendingTask::PendingTask(
42 const base::Closure
& task
, Priority pri
, int seq
)
43 : task(task
), priority(pri
), seq(seq
) {}
45 SyncTaskManager::PendingTask::~PendingTask() {}
47 bool SyncTaskManager::PendingTaskComparator::operator()(
48 const PendingTask
& left
,
49 const PendingTask
& right
) const {
50 if (left
.priority
!= right
.priority
)
51 return left
.priority
< right
.priority
;
52 return left
.seq
> right
.seq
;
55 SyncTaskManager::SyncTaskManager(
56 base::WeakPtr
<Client
> client
,
57 size_t maximum_background_task
,
58 base::SequencedTaskRunner
* task_runner
)
60 maximum_background_task_(maximum_background_task
),
62 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID
),
63 task_runner_(task_runner
) {
66 SyncTaskManager::~SyncTaskManager() {
71 void SyncTaskManager::Initialize(SyncStatusCode status
) {
72 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
75 SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_
.get()),
79 void SyncTaskManager::ScheduleTask(
80 const tracked_objects::Location
& from_here
,
83 const SyncStatusCallback
& callback
) {
84 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
86 ScheduleSyncTask(from_here
,
87 scoped_ptr
<SyncTask
>(new SyncTaskAdapter(task
)),
92 void SyncTaskManager::ScheduleSyncTask(
93 const tracked_objects::Location
& from_here
,
94 scoped_ptr
<SyncTask
> task
,
96 const SyncStatusCallback
& callback
) {
97 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
99 scoped_ptr
<SyncTaskToken
> token(GetToken(from_here
, callback
));
102 base::Bind(&SyncTaskManager::ScheduleSyncTask
, AsWeakPtr(), from_here
,
103 base::Passed(&task
), priority
, callback
),
107 RunTask(token
.Pass(), task
.Pass());
110 bool SyncTaskManager::ScheduleTaskIfIdle(
111 const tracked_objects::Location
& from_here
,
113 const SyncStatusCallback
& callback
) {
114 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
116 return ScheduleSyncTaskIfIdle(
118 scoped_ptr
<SyncTask
>(new SyncTaskAdapter(task
)),
122 bool SyncTaskManager::ScheduleSyncTaskIfIdle(
123 const tracked_objects::Location
& from_here
,
124 scoped_ptr
<SyncTask
> task
,
125 const SyncStatusCallback
& callback
) {
126 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
128 scoped_ptr
<SyncTaskToken
> token(GetToken(from_here
, callback
));
131 RunTask(token
.Pass(), task
.Pass());
136 void SyncTaskManager::NotifyTaskDone(scoped_ptr
<SyncTaskToken
> token
,
137 SyncStatusCode status
) {
140 SyncTaskManager
* manager
= token
->manager();
141 if (token
->token_id() == SyncTaskToken::kTestingTaskTokenID
) {
143 SyncStatusCallback callback
= token
->callback();
144 token
->clear_callback();
145 callback
.Run(status
);
150 manager
->NotifyTaskDoneBody(token
.Pass(), status
);
154 void SyncTaskManager::UpdateBlockingFactor(
155 scoped_ptr
<SyncTaskToken
> current_task_token
,
156 scoped_ptr
<BlockingFactor
> blocking_factor
,
157 const Continuation
& continuation
) {
158 DCHECK(current_task_token
);
160 SyncTaskManager
* manager
= current_task_token
->manager();
161 if (current_task_token
->token_id() == SyncTaskToken::kTestingTaskTokenID
) {
163 continuation
.Run(current_task_token
.Pass());
170 scoped_ptr
<SyncTaskToken
> foreground_task_token
;
171 scoped_ptr
<SyncTaskToken
> background_task_token
;
172 scoped_ptr
<TaskLogger::TaskLog
> task_log
= current_task_token
->PassTaskLog();
173 if (current_task_token
->token_id() == SyncTaskToken::kForegroundTaskTokenID
)
174 foreground_task_token
= current_task_token
.Pass();
176 background_task_token
= current_task_token
.Pass();
178 manager
->UpdateBlockingFactorBody(foreground_task_token
.Pass(),
179 background_task_token
.Pass(),
181 blocking_factor
.Pass(),
185 bool SyncTaskManager::IsRunningTask(int64 token_id
) const {
186 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
188 // If the client is gone, all task should be aborted.
192 if (token_id
== SyncTaskToken::kForegroundTaskTokenID
)
195 return ContainsKey(running_background_tasks_
, token_id
);
198 void SyncTaskManager::DetachFromSequence() {
199 sequence_checker_
.DetachFromSequence();
202 void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr
<SyncTaskToken
> token
,
203 SyncStatusCode status
) {
204 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
207 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
208 << " (" << SyncStatusCodeToString(status
) << ")"
209 << " " << token
->location().ToString();
211 if (token
->blocking_factor()) {
212 dependency_manager_
.Erase(token
->blocking_factor());
213 token
->clear_blocking_factor();
217 if (token
->has_task_log()) {
218 token
->FinalizeTaskLog(SyncStatusCodeToString(status
));
219 client_
->RecordTaskLog(token
->PassTaskLog());
223 scoped_ptr
<SyncTask
> task
;
224 SyncStatusCallback callback
= token
->callback();
225 token
->clear_callback();
226 if (token
->token_id() == SyncTaskToken::kForegroundTaskTokenID
) {
227 token_
= token
.Pass();
228 task
= running_foreground_task_
.Pass();
230 task
= running_background_tasks_
.take_and_erase(token
->token_id());
233 // Acquire the token to prevent a new task to jump into the queue.
234 token
= token_
.Pass();
236 bool task_used_network
= false;
238 task_used_network
= task
->used_network();
241 client_
->NotifyLastOperationStatus(status
, task_used_network
);
243 if (!callback
.is_null())
244 callback
.Run(status
);
246 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
247 // making the call-chaing longer.
248 task_runner_
->PostTask(
250 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask
,
251 AsWeakPtr(), base::Passed(&token
)));
254 void SyncTaskManager::UpdateBlockingFactorBody(
255 scoped_ptr
<SyncTaskToken
> foreground_task_token
,
256 scoped_ptr
<SyncTaskToken
> background_task_token
,
257 scoped_ptr
<TaskLogger::TaskLog
> task_log
,
258 scoped_ptr
<BlockingFactor
> blocking_factor
,
259 const Continuation
& continuation
) {
260 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
262 // Run the task directly if the parallelization is disabled.
263 if (!maximum_background_task_
) {
264 DCHECK(foreground_task_token
);
265 DCHECK(!background_task_token
);
266 foreground_task_token
->SetTaskLog(task_log
.Pass());
267 continuation
.Run(foreground_task_token
.Pass());
271 // Clear existing |blocking_factor| from |dependency_manager_| before
272 // getting |foreground_task_token|, so that we can avoid dead lock.
273 if (background_task_token
&& background_task_token
->blocking_factor()) {
274 dependency_manager_
.Erase(background_task_token
->blocking_factor());
275 background_task_token
->clear_blocking_factor();
278 // Try to get |foreground_task_token|. If it's not available, wait for
279 // current foreground task to finish.
280 if (!foreground_task_token
) {
281 DCHECK(background_task_token
);
282 foreground_task_token
= GetToken(background_task_token
->location(),
283 SyncStatusCallback());
284 if (!foreground_task_token
) {
286 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody
,
288 base::Passed(&foreground_task_token
),
289 base::Passed(&background_task_token
),
290 base::Passed(&task_log
),
291 base::Passed(&blocking_factor
),
294 MaybeStartNextForegroundTask(scoped_ptr
<SyncTaskToken
>());
299 // Check if the task can run as a background task now.
300 // If there are too many task running or any other task blocks current
301 // task, wait for any other task to finish.
302 bool task_number_limit_exceeded
=
303 !background_task_token
&&
304 running_background_tasks_
.size() >= maximum_background_task_
;
305 if (task_number_limit_exceeded
||
306 !dependency_manager_
.Insert(blocking_factor
.get())) {
307 DCHECK(!running_background_tasks_
.empty());
308 DCHECK(pending_backgrounding_task_
.is_null());
310 // Wait for NotifyTaskDone to release a |blocking_factor|.
311 pending_backgrounding_task_
=
312 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody
,
314 base::Passed(&foreground_task_token
),
315 base::Passed(&background_task_token
),
316 base::Passed(&task_log
),
317 base::Passed(&blocking_factor
),
322 if (background_task_token
) {
323 background_task_token
->set_blocking_factor(blocking_factor
.Pass());
325 tracked_objects::Location from_here
= foreground_task_token
->location();
326 SyncStatusCallback callback
= foreground_task_token
->callback();
327 foreground_task_token
->clear_callback();
329 background_task_token
=
330 SyncTaskToken::CreateForBackgroundTask(AsWeakPtr(),
333 blocking_factor
.Pass());
334 background_task_token
->UpdateTask(from_here
, callback
);
335 running_background_tasks_
.set(background_task_token
->token_id(),
336 running_foreground_task_
.Pass());
339 token_
= foreground_task_token
.Pass();
340 MaybeStartNextForegroundTask(scoped_ptr
<SyncTaskToken
>());
341 background_task_token
->SetTaskLog(task_log
.Pass());
342 continuation
.Run(background_task_token
.Pass());
345 scoped_ptr
<SyncTaskToken
> SyncTaskManager::GetToken(
346 const tracked_objects::Location
& from_here
,
347 const SyncStatusCallback
& callback
) {
348 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
351 return scoped_ptr
<SyncTaskToken
>();
352 token_
->UpdateTask(from_here
, callback
);
353 return token_
.Pass();
356 void SyncTaskManager::PushPendingTask(
357 const base::Closure
& closure
, Priority priority
) {
358 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
360 pending_tasks_
.push(PendingTask(closure
, priority
, pending_task_seq_
++));
363 void SyncTaskManager::RunTask(scoped_ptr
<SyncTaskToken
> token
,
364 scoped_ptr
<SyncTask
> task
) {
365 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
366 DCHECK(!running_foreground_task_
);
368 running_foreground_task_
= task
.Pass();
369 running_foreground_task_
->RunPreflight(token
.Pass());
372 void SyncTaskManager::MaybeStartNextForegroundTask(
373 scoped_ptr
<SyncTaskToken
> token
) {
374 DCHECK(sequence_checker_
.CalledOnValidSequencedThread());
378 token_
= token
.Pass();
381 if (!pending_backgrounding_task_
.is_null()) {
382 base::Closure closure
= pending_backgrounding_task_
;
383 pending_backgrounding_task_
.Reset();
391 if (!pending_tasks_
.empty()) {
392 base::Closure closure
= pending_tasks_
.top().task
;
393 pending_tasks_
.pop();
399 client_
->MaybeScheduleNextTask();
402 } // namespace drive_backend
403 } // namespace sync_file_system