1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/sync_scheduler_impl.h"
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h"
22 #include "sync/util/logging.h"
24 using base::TimeDelta
;
25 using base::TimeTicks
;
29 using sessions::SyncSession
;
30 using sessions::SyncSessionSnapshot
;
31 using sync_pb::GetUpdatesCallerInfo
;
35 bool ShouldRequestEarlyExit(const SyncProtocolError
& error
) {
36 switch (error
.error_type
) {
44 case DISABLED_BY_ADMIN
:
46 // If we send terminate sync early then |sync_cycle_ended| notification
47 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
48 // notification wouldnt be sent either. Then the UI layer would be left
49 // waiting forever. So assert we would send something.
50 DCHECK_NE(error
.action
, UNKNOWN_ACTION
);
52 case INVALID_CREDENTIAL
:
53 // The notification for this is handled by PostAndProcessHeaders|.
54 // Server does no have to send any action for this.
56 // Make the default a NOTREACHED. So if a new error is introduced we
57 // think about its expected functionality.
64 bool IsActionableError(
65 const SyncProtocolError
& error
) {
66 return (error
.action
!= UNKNOWN_ACTION
);
70 ConfigurationParams::ConfigurationParams()
71 : source(GetUpdatesCallerInfo::UNKNOWN
) {}
72 ConfigurationParams::ConfigurationParams(
73 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource
& source
,
74 ModelTypeSet types_to_download
,
75 const ModelSafeRoutingInfo
& routing_info
,
76 const base::Closure
& ready_task
,
77 const base::Closure
& retry_task
)
79 types_to_download(types_to_download
),
80 routing_info(routing_info
),
81 ready_task(ready_task
),
82 retry_task(retry_task
) {
83 DCHECK(!ready_task
.is_null());
84 DCHECK(!retry_task
.is_null());
86 ConfigurationParams::~ConfigurationParams() {}
88 SyncSchedulerImpl::WaitInterval::WaitInterval()
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode
, TimeDelta length
)
92 : mode(mode
), length(length
) {}
94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
96 #define ENUM_CASE(x) case x: return #x; break;
98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode
) {
101 ENUM_CASE(EXPONENTIAL_BACKOFF
);
102 ENUM_CASE(THROTTLED
);
108 GetUpdatesCallerInfo::GetUpdatesSource
GetUpdatesFromNudgeSource(
109 NudgeSource source
) {
111 case NUDGE_SOURCE_NOTIFICATION
:
112 return GetUpdatesCallerInfo::NOTIFICATION
;
113 case NUDGE_SOURCE_LOCAL
:
114 return GetUpdatesCallerInfo::LOCAL
;
115 case NUDGE_SOURCE_LOCAL_REFRESH
:
116 return GetUpdatesCallerInfo::DATATYPE_REFRESH
;
117 case NUDGE_SOURCE_UNKNOWN
:
118 return GetUpdatesCallerInfo::UNKNOWN
;
121 return GetUpdatesCallerInfo::UNKNOWN
;
125 // Helper macros to log with the syncer thread name; useful when there
126 // are multiple syncer threads involved.
128 #define SLOG(severity) LOG(severity) << name_ << ": "
130 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
132 #define SDVLOG_LOC(from_here, verbose_level) \
133 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
137 const int kDefaultSessionsCommitDelaySeconds
= 10;
139 bool IsConfigRelatedUpdateSourceValue(
140 GetUpdatesCallerInfo::GetUpdatesSource source
) {
142 case GetUpdatesCallerInfo::RECONFIGURATION
:
143 case GetUpdatesCallerInfo::MIGRATION
:
144 case GetUpdatesCallerInfo::NEW_CLIENT
:
145 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE
:
154 SyncSchedulerImpl::SyncSchedulerImpl(const std::string
& name
,
155 BackoffDelayProvider
* delay_provider
,
156 sessions::SyncSessionContext
* context
,
160 syncer_short_poll_interval_seconds_(
161 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds
)),
162 syncer_long_poll_interval_seconds_(
163 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds
)),
164 sessions_commit_delay_(
165 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds
)),
167 delay_provider_(delay_provider
),
169 session_context_(context
),
170 no_scheduling_allowed_(false),
171 do_poll_after_credentials_updated_(false),
172 next_sync_session_job_priority_(NORMAL_PRIORITY
),
173 weak_ptr_factory_(this),
174 weak_ptr_factory_for_weak_handle_(this) {
175 weak_handle_this_
= MakeWeakHandle(
176 weak_ptr_factory_for_weak_handle_
.GetWeakPtr());
179 SyncSchedulerImpl::~SyncSchedulerImpl() {
180 DCHECK(CalledOnValidThread());
184 void SyncSchedulerImpl::OnCredentialsUpdated() {
185 DCHECK(CalledOnValidThread());
187 if (HttpResponse::SYNC_AUTH_ERROR
==
188 session_context_
->connection_manager()->server_status()) {
189 OnServerConnectionErrorFixed();
193 void SyncSchedulerImpl::OnConnectionStatusChange() {
194 if (HttpResponse::CONNECTION_UNAVAILABLE
==
195 session_context_
->connection_manager()->server_status()) {
196 // Optimistically assume that the connection is fixed and try
198 OnServerConnectionErrorFixed();
202 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
203 // There could be a pending nudge or configuration job in several cases:
205 // 1. We're in exponential backoff.
206 // 2. We're silenced / throttled.
207 // 3. A nudge was saved previously due to not having a valid auth token.
208 // 4. A nudge was scheduled + saved while in configuration mode.
210 // In all cases except (2), we want to retry contacting the server. We
211 // call TryCanaryJob to achieve this, and note that nothing -- not even a
212 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
213 // has the authority to do that is the Unthrottle timer.
217 void SyncSchedulerImpl::Start(Mode mode
) {
218 DCHECK(CalledOnValidThread());
219 std::string thread_name
= base::MessageLoop::current()->thread_name();
220 if (thread_name
.empty())
221 thread_name
= "<Main thread>";
222 SDVLOG(2) << "Start called from thread "
223 << thread_name
<< " with mode " << GetModeString(mode
);
226 SendInitialSnapshot();
229 DCHECK(!session_context_
->account_name().empty());
230 DCHECK(syncer_
.get());
231 Mode old_mode
= mode_
;
233 AdjustPolling(UPDATE_INTERVAL
); // Will kick start poll timer if needed.
235 if (old_mode
!= mode_
&& mode_
== NORMAL_MODE
) {
236 // We just got back to normal mode. Let's try to run the work that was
237 // queued up while we were configuring.
239 // Update our current time before checking IsRetryRequired().
240 nudge_tracker_
.SetSyncCycleStartTime(base::TimeTicks::Now());
241 if (nudge_tracker_
.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY
)) {
247 ModelTypeSet
SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
248 ModelTypeSet enabled_types
= session_context_
->GetEnabledTypes();
249 ModelTypeSet enabled_protocol_types
=
250 Intersection(ProtocolTypes(), enabled_types
);
251 ModelTypeSet throttled_types
= nudge_tracker_
.GetThrottledTypes();
252 return Difference(enabled_protocol_types
, throttled_types
);
255 void SyncSchedulerImpl::SendInitialSnapshot() {
256 DCHECK(CalledOnValidThread());
257 scoped_ptr
<SyncSession
> dummy(SyncSession::Build(session_context_
, this));
258 SyncCycleEvent
event(SyncCycleEvent::STATUS_CHANGED
);
259 event
.snapshot
= dummy
->TakeSnapshot();
260 FOR_EACH_OBSERVER(SyncEngineEventListener
,
261 *session_context_
->listeners(),
262 OnSyncCycleEvent(event
));
267 // Helper to extract the routing info corresponding to types in
268 // |types_to_download| from |current_routes|.
269 void BuildModelSafeParams(
270 ModelTypeSet types_to_download
,
271 const ModelSafeRoutingInfo
& current_routes
,
272 ModelSafeRoutingInfo
* result_routes
) {
273 for (ModelTypeSet::Iterator iter
= types_to_download
.First(); iter
.Good();
275 ModelType type
= iter
.Get();
276 ModelSafeRoutingInfo::const_iterator route
= current_routes
.find(type
);
277 DCHECK(route
!= current_routes
.end());
278 ModelSafeGroup group
= route
->second
;
279 (*result_routes
)[type
] = group
;
285 void SyncSchedulerImpl::ScheduleConfiguration(
286 const ConfigurationParams
& params
) {
287 DCHECK(CalledOnValidThread());
288 DCHECK(IsConfigRelatedUpdateSourceValue(params
.source
));
289 DCHECK_EQ(CONFIGURATION_MODE
, mode_
);
290 DCHECK(!params
.ready_task
.is_null());
291 CHECK(started_
) << "Scheduler must be running to configure.";
292 SDVLOG(2) << "Reconfiguring syncer.";
294 // Only one configuration is allowed at a time. Verify we're not waiting
295 // for a pending configure job.
296 DCHECK(!pending_configure_params_
);
298 ModelSafeRoutingInfo restricted_routes
;
299 BuildModelSafeParams(params
.types_to_download
,
302 session_context_
->SetRoutingInfo(restricted_routes
);
304 // Only reconfigure if we have types to download.
305 if (!params
.types_to_download
.Empty()) {
306 pending_configure_params_
.reset(new ConfigurationParams(params
));
309 SDVLOG(2) << "No change in routing info, calling ready task directly.";
310 params
.ready_task
.Run();
314 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority
) {
315 DCHECK(CalledOnValidThread());
316 if (wait_interval_
&& wait_interval_
->mode
== WaitInterval::THROTTLED
) {
317 SDVLOG(1) << "Unable to run a job because we're throttled.";
322 && wait_interval_
->mode
== WaitInterval::EXPONENTIAL_BACKOFF
323 && priority
!= CANARY_PRIORITY
) {
324 SDVLOG(1) << "Unable to run a job because we're backing off.";
328 if (session_context_
->connection_manager()->HasInvalidAuthToken()) {
329 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
336 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority
) {
337 DCHECK(CalledOnValidThread());
339 if (!CanRunJobNow(priority
)) {
340 SDVLOG(1) << "Unable to run a nudge job right now";
344 const ModelTypeSet enabled_types
= session_context_
->GetEnabledTypes();
345 if (nudge_tracker_
.GetThrottledTypes().HasAll(enabled_types
)) {
346 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
350 if (mode_
== CONFIGURATION_MODE
) {
351 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
358 void SyncSchedulerImpl::ScheduleLocalNudge(
359 const TimeDelta
& desired_delay
,
361 const tracked_objects::Location
& nudge_location
) {
362 DCHECK(CalledOnValidThread());
363 DCHECK(!types
.Empty());
365 SDVLOG_LOC(nudge_location
, 2)
366 << "Scheduling sync because of local change to "
367 << ModelTypeSetToString(types
);
368 UpdateNudgeTimeRecords(types
);
369 nudge_tracker_
.RecordLocalChange(types
);
370 ScheduleNudgeImpl(desired_delay
, nudge_location
);
373 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
374 const TimeDelta
& desired_delay
,
376 const tracked_objects::Location
& nudge_location
) {
377 DCHECK(CalledOnValidThread());
378 DCHECK(!types
.Empty());
380 SDVLOG_LOC(nudge_location
, 2)
381 << "Scheduling sync because of local refresh request for "
382 << ModelTypeSetToString(types
);
383 nudge_tracker_
.RecordLocalRefreshRequest(types
);
384 ScheduleNudgeImpl(desired_delay
, nudge_location
);
387 void SyncSchedulerImpl::ScheduleInvalidationNudge(
388 const TimeDelta
& desired_delay
,
389 syncer::ModelType model_type
,
390 scoped_ptr
<InvalidationInterface
> invalidation
,
391 const tracked_objects::Location
& nudge_location
) {
392 DCHECK(CalledOnValidThread());
394 SDVLOG_LOC(nudge_location
, 2)
395 << "Scheduling sync because we received invalidation for "
396 << ModelTypeToString(model_type
);
397 nudge_tracker_
.RecordRemoteInvalidation(model_type
, invalidation
.Pass());
398 ScheduleNudgeImpl(desired_delay
, nudge_location
);
401 // TODO(zea): Consider adding separate throttling/backoff for datatype
403 void SyncSchedulerImpl::ScheduleNudgeImpl(
404 const TimeDelta
& delay
,
405 const tracked_objects::Location
& nudge_location
) {
406 DCHECK(CalledOnValidThread());
408 if (no_scheduling_allowed_
) {
409 NOTREACHED() << "Illegal to schedule job while session in progress.";
414 SDVLOG_LOC(nudge_location
, 2)
415 << "Dropping nudge, scheduler is not running.";
419 SDVLOG_LOC(nudge_location
, 2)
420 << "In ScheduleNudgeImpl with delay "
421 << delay
.InMilliseconds() << " ms";
423 if (!CanRunNudgeJobNow(NORMAL_PRIORITY
))
426 TimeTicks incoming_run_time
= TimeTicks::Now() + delay
;
427 if (!scheduled_nudge_time_
.is_null() &&
428 (scheduled_nudge_time_
< incoming_run_time
)) {
429 // Old job arrives sooner than this one. Don't reschedule it.
433 // Either there is no existing nudge in flight or the incoming nudge should be
434 // made to arrive first (preempt) the existing nudge. We reschedule in either
436 SDVLOG_LOC(nudge_location
, 2)
437 << "Scheduling a nudge with "
438 << delay
.InMilliseconds() << " ms delay";
439 scheduled_nudge_time_
= incoming_run_time
;
440 pending_wakeup_timer_
.Start(
443 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge
,
444 weak_ptr_factory_
.GetWeakPtr()));
447 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode
) {
449 ENUM_CASE(CONFIGURATION_MODE
);
450 ENUM_CASE(NORMAL_MODE
);
455 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority
) {
456 DCHECK(CalledOnValidThread());
457 DCHECK(CanRunNudgeJobNow(priority
));
459 DVLOG(2) << "Will run normal mode sync cycle with types "
460 << ModelTypeSetToString(session_context_
->GetEnabledTypes());
461 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
462 bool premature_exit
= !syncer_
->NormalSyncShare(
463 GetEnabledAndUnthrottledTypes(),
466 AdjustPolling(FORCE_RESET
);
467 // Don't run poll job till the next time poll timer fires.
468 do_poll_after_credentials_updated_
= false;
470 bool success
= !premature_exit
471 && !sessions::HasSyncerError(
472 session
->status_controller().model_neutral_state());
475 // That cycle took care of any outstanding work we had.
476 SDVLOG(2) << "Nudge succeeded.";
477 nudge_tracker_
.RecordSuccessfulSyncCycle();
478 scheduled_nudge_time_
= base::TimeTicks();
480 // If we're here, then we successfully reached the server. End all backoff.
481 wait_interval_
.reset();
482 NotifyRetryTime(base::Time());
484 HandleFailure(session
->status_controller().model_neutral_state());
488 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority
) {
489 DCHECK(CalledOnValidThread());
490 DCHECK_EQ(mode_
, CONFIGURATION_MODE
);
491 DCHECK(pending_configure_params_
!= NULL
);
493 if (!CanRunJobNow(priority
)) {
494 SDVLOG(2) << "Unable to run configure job right now.";
495 if (!pending_configure_params_
->retry_task
.is_null()) {
496 pending_configure_params_
->retry_task
.Run();
497 pending_configure_params_
->retry_task
.Reset();
502 SDVLOG(2) << "Will run configure SyncShare with types "
503 << ModelTypeSetToString(session_context_
->GetEnabledTypes());
504 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
505 bool premature_exit
= !syncer_
->ConfigureSyncShare(
506 pending_configure_params_
->types_to_download
,
507 pending_configure_params_
->source
,
509 AdjustPolling(FORCE_RESET
);
510 // Don't run poll job till the next time poll timer fires.
511 do_poll_after_credentials_updated_
= false;
513 bool success
= !premature_exit
514 && !sessions::HasSyncerError(
515 session
->status_controller().model_neutral_state());
518 SDVLOG(2) << "Configure succeeded.";
519 pending_configure_params_
->ready_task
.Run();
520 pending_configure_params_
.reset();
522 // If we're here, then we successfully reached the server. End all backoff.
523 wait_interval_
.reset();
524 NotifyRetryTime(base::Time());
526 HandleFailure(session
->status_controller().model_neutral_state());
527 // Sync cycle might receive response from server that causes scheduler to
528 // stop and draws pending_configure_params_ invalid.
529 if (started_
&& !pending_configure_params_
->retry_task
.is_null()) {
530 pending_configure_params_
->retry_task
.Run();
531 pending_configure_params_
->retry_task
.Reset();
536 void SyncSchedulerImpl::HandleFailure(
537 const sessions::ModelNeutralState
& model_neutral_state
) {
538 if (IsCurrentlyThrottled()) {
539 SDVLOG(2) << "Was throttled during previous sync cycle.";
541 } else if (!IsBackingOff()) {
542 // Setup our backoff if this is our first such failure.
543 TimeDelta length
= delay_provider_
->GetDelay(
544 delay_provider_
->GetInitialDelay(model_neutral_state
));
545 wait_interval_
.reset(
546 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF
, length
));
547 SDVLOG(2) << "Sync cycle failed. Will back off for "
548 << wait_interval_
->length
.InMilliseconds() << "ms.";
553 void SyncSchedulerImpl::DoPollSyncSessionJob() {
554 base::AutoReset
<bool> protector(&no_scheduling_allowed_
, true);
556 SDVLOG(2) << "Polling with types "
557 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
558 scoped_ptr
<SyncSession
> session(SyncSession::Build(session_context_
, this));
559 syncer_
->PollSyncShare(
560 GetEnabledAndUnthrottledTypes(),
563 AdjustPolling(FORCE_RESET
);
565 if (IsCurrentlyThrottled()) {
566 SDVLOG(2) << "Poll request got us throttled.";
567 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
568 // to do is start the timer.
573 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types
) {
574 DCHECK(CalledOnValidThread());
575 base::TimeTicks now
= TimeTicks::Now();
576 // Update timing information for how often datatypes are triggering nudges.
577 for (ModelTypeSet::Iterator iter
= types
.First(); iter
.Good(); iter
.Inc()) {
578 base::TimeTicks previous
= last_local_nudges_by_model_type_
[iter
.Get()];
579 last_local_nudges_by_model_type_
[iter
.Get()] = now
;
580 if (previous
.is_null())
583 #define PER_DATA_TYPE_MACRO(type_str) \
584 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
585 SYNC_DATA_TYPE_HISTOGRAM(iter
.Get());
586 #undef PER_DATA_TYPE_MACRO
590 TimeDelta
SyncSchedulerImpl::GetPollInterval() {
591 return (!session_context_
->notifications_enabled() ||
592 !session_context_
->ShouldFetchUpdatesBeforeCommit()) ?
593 syncer_short_poll_interval_seconds_
:
594 syncer_long_poll_interval_seconds_
;
597 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type
) {
598 DCHECK(CalledOnValidThread());
600 TimeDelta poll
= GetPollInterval();
601 bool rate_changed
= !poll_timer_
.IsRunning() ||
602 poll
!= poll_timer_
.GetCurrentDelay();
604 if (type
== FORCE_RESET
) {
605 last_poll_reset_
= base::TimeTicks::Now();
615 poll_timer_
.Start(FROM_HERE
, poll
, this,
616 &SyncSchedulerImpl::PollTimerCallback
);
619 void SyncSchedulerImpl::RestartWaiting() {
620 CHECK(wait_interval_
.get());
621 DCHECK(wait_interval_
->length
>= TimeDelta::FromSeconds(0));
622 NotifyRetryTime(base::Time::Now() + wait_interval_
->length
);
623 SDVLOG(2) << "Starting WaitInterval timer of length "
624 << wait_interval_
->length
.InMilliseconds() << "ms.";
625 if (wait_interval_
->mode
== WaitInterval::THROTTLED
) {
626 pending_wakeup_timer_
.Start(
628 wait_interval_
->length
,
629 base::Bind(&SyncSchedulerImpl::Unthrottle
,
630 weak_ptr_factory_
.GetWeakPtr()));
632 pending_wakeup_timer_
.Start(
634 wait_interval_
->length
,
635 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry
,
636 weak_ptr_factory_
.GetWeakPtr()));
640 void SyncSchedulerImpl::Stop() {
641 DCHECK(CalledOnValidThread());
642 SDVLOG(2) << "Stop called";
644 // Kill any in-flight method calls.
645 weak_ptr_factory_
.InvalidateWeakPtrs();
646 wait_interval_
.reset();
647 NotifyRetryTime(base::Time());
649 pending_wakeup_timer_
.Stop();
650 pending_configure_params_
.reset();
655 // This is the only place where we invoke DoSyncSessionJob with canary
656 // privileges. Everyone else should use NORMAL_PRIORITY.
657 void SyncSchedulerImpl::TryCanaryJob() {
658 next_sync_session_job_priority_
= CANARY_PRIORITY
;
662 void SyncSchedulerImpl::TrySyncSessionJob() {
663 // Post call to TrySyncSessionJobImpl on current thread. Later request for
664 // access token will be here.
665 base::MessageLoop::current()->PostTask(FROM_HERE
, base::Bind(
666 &SyncSchedulerImpl::TrySyncSessionJobImpl
,
667 weak_ptr_factory_
.GetWeakPtr()));
670 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
671 JobPriority priority
= next_sync_session_job_priority_
;
672 next_sync_session_job_priority_
= NORMAL_PRIORITY
;
674 nudge_tracker_
.SetSyncCycleStartTime(base::TimeTicks::Now());
676 DCHECK(CalledOnValidThread());
677 if (mode_
== CONFIGURATION_MODE
) {
678 if (pending_configure_params_
) {
679 SDVLOG(2) << "Found pending configure job";
680 DoConfigurationSyncSessionJob(priority
);
682 } else if (CanRunNudgeJobNow(priority
)) {
683 if (nudge_tracker_
.IsSyncRequired()) {
684 SDVLOG(2) << "Found pending nudge job";
685 DoNudgeSyncSessionJob(priority
);
686 } else if (do_poll_after_credentials_updated_
||
687 ((base::TimeTicks::Now() - last_poll_reset_
) >= GetPollInterval())) {
688 DoPollSyncSessionJob();
689 // Poll timer fires infrequently. Usually by this time access token is
690 // already expired and poll job will fail with auth error. Set flag to
691 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
692 // will be called after access token is retrieved.
693 if (HttpResponse::SYNC_AUTH_ERROR
==
694 session_context_
->connection_manager()->server_status()) {
695 do_poll_after_credentials_updated_
= true;
700 if (priority
== CANARY_PRIORITY
) {
701 // If this is canary job then whatever result was don't run poll job till
702 // the next time poll timer fires.
703 do_poll_after_credentials_updated_
= false;
706 if (IsBackingOff() && !pending_wakeup_timer_
.IsRunning()) {
707 // If we succeeded, our wait interval would have been cleared. If it hasn't
708 // been cleared, then we should increase our backoff interval and schedule
710 TimeDelta length
= delay_provider_
->GetDelay(wait_interval_
->length
);
711 wait_interval_
.reset(
712 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF
, length
));
713 SDVLOG(2) << "Sync cycle failed. Will back off for "
714 << wait_interval_
->length
.InMilliseconds() << "ms.";
719 void SyncSchedulerImpl::PollTimerCallback() {
720 DCHECK(CalledOnValidThread());
721 if (no_scheduling_allowed_
) {
722 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
723 // functions that are called only on the sync thread. This function is also
724 // called only on the sync thread, and only when it is posted by an expiring
725 // timer. If we find that no_scheduling_allowed_ is set here, then
726 // something is very wrong. Maybe someone mistakenly called us directly, or
727 // mishandled the book-keeping for no_scheduling_allowed_.
728 NOTREACHED() << "Illegal to schedule job while session in progress.";
735 void SyncSchedulerImpl::RetryTimerCallback() {
739 void SyncSchedulerImpl::Unthrottle() {
740 DCHECK(CalledOnValidThread());
741 DCHECK_EQ(WaitInterval::THROTTLED
, wait_interval_
->mode
);
743 // We're no longer throttled, so clear the wait interval.
744 wait_interval_
.reset();
745 NotifyRetryTime(base::Time());
746 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
748 // We treat this as a 'canary' in the sense that it was originally scheduled
749 // to run some time ago, failed, and we now want to retry, versus a job that
750 // was just created (e.g via ScheduleNudgeImpl). The main implication is
751 // that we're careful to update routing info (etc) with such potentially
752 // stale canary jobs.
756 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time
) {
757 DCHECK(CalledOnValidThread());
758 nudge_tracker_
.UpdateTypeThrottlingState(unthrottle_time
);
759 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
761 if (nudge_tracker_
.IsAnyTypeThrottled()) {
762 const base::TimeTicks now
= base::TimeTicks::Now();
763 base::TimeDelta time_until_next_unthrottle
=
764 nudge_tracker_
.GetTimeUntilNextUnthrottle(now
);
765 type_unthrottle_timer_
.Start(
767 time_until_next_unthrottle
,
768 base::Bind(&SyncSchedulerImpl::TypeUnthrottle
,
769 weak_ptr_factory_
.GetWeakPtr(),
770 now
+ time_until_next_unthrottle
));
773 // Maybe this is a good time to run a nudge job. Let's try it.
774 if (nudge_tracker_
.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY
))
778 void SyncSchedulerImpl::PerformDelayedNudge() {
779 // Circumstances may have changed since we scheduled this delayed nudge.
780 // We must check to see if it's OK to run the job before we do so.
781 if (CanRunNudgeJobNow(NORMAL_PRIORITY
))
784 // We're not responsible for setting up any retries here. The functions that
785 // first put us into a state that prevents successful sync cycles (eg. global
786 // throttling, type throttling, network errors, transient errors) will also
787 // setup the appropriate retry logic (eg. retry after timeout, exponential
788 // backoff, retry when the network changes).
791 void SyncSchedulerImpl::ExponentialBackoffRetry() {
795 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time
) {
796 FOR_EACH_OBSERVER(SyncEngineEventListener
,
797 *session_context_
->listeners(),
798 OnRetryTimeChanged(retry_time
));
801 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types
) {
802 FOR_EACH_OBSERVER(SyncEngineEventListener
,
803 *session_context_
->listeners(),
804 OnThrottledTypesChanged(types
));
807 bool SyncSchedulerImpl::IsBackingOff() const {
808 DCHECK(CalledOnValidThread());
809 return wait_interval_
.get() && wait_interval_
->mode
==
810 WaitInterval::EXPONENTIAL_BACKOFF
;
813 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta
& throttle_duration
) {
814 DCHECK(CalledOnValidThread());
815 wait_interval_
.reset(new WaitInterval(WaitInterval::THROTTLED
,
817 NotifyRetryTime(base::Time::Now() + wait_interval_
->length
);
818 NotifyThrottledTypesChanged(ModelTypeSet::All());
821 void SyncSchedulerImpl::OnTypesThrottled(
823 const base::TimeDelta
& throttle_duration
) {
824 base::TimeTicks now
= base::TimeTicks::Now();
826 nudge_tracker_
.SetTypesThrottledUntil(types
, throttle_duration
, now
);
827 base::TimeDelta time_until_next_unthrottle
=
828 nudge_tracker_
.GetTimeUntilNextUnthrottle(now
);
829 type_unthrottle_timer_
.Start(
831 time_until_next_unthrottle
,
832 base::Bind(&SyncSchedulerImpl::TypeUnthrottle
,
833 weak_ptr_factory_
.GetWeakPtr(),
834 now
+ time_until_next_unthrottle
));
835 NotifyThrottledTypesChanged(nudge_tracker_
.GetThrottledTypes());
838 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
839 DCHECK(CalledOnValidThread());
840 return wait_interval_
.get() && wait_interval_
->mode
==
841 WaitInterval::THROTTLED
;
844 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
845 const base::TimeDelta
& new_interval
) {
846 DCHECK(CalledOnValidThread());
847 syncer_short_poll_interval_seconds_
= new_interval
;
850 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
851 const base::TimeDelta
& new_interval
) {
852 DCHECK(CalledOnValidThread());
853 syncer_long_poll_interval_seconds_
= new_interval
;
856 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
857 const base::TimeDelta
& new_delay
) {
858 DCHECK(CalledOnValidThread());
859 sessions_commit_delay_
= new_delay
;
862 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size
) {
864 nudge_tracker_
.SetHintBufferSize(size
);
866 NOTREACHED() << "Hint buffer size should be > 0.";
869 void SyncSchedulerImpl::OnSyncProtocolError(
870 const SyncProtocolError
& sync_protocol_error
) {
871 DCHECK(CalledOnValidThread());
872 if (ShouldRequestEarlyExit(sync_protocol_error
)) {
873 SDVLOG(2) << "Sync Scheduler requesting early exit.";
876 if (IsActionableError(sync_protocol_error
)) {
877 SDVLOG(2) << "OnActionableError";
878 FOR_EACH_OBSERVER(SyncEngineEventListener
,
879 *session_context_
->listeners(),
880 OnActionableError(sync_protocol_error
));
884 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta
& delay
) {
885 nudge_tracker_
.SetNextRetryTime(TimeTicks::Now() + delay
);
886 retry_timer_
.Start(FROM_HERE
, delay
, this,
887 &SyncSchedulerImpl::RetryTimerCallback
);
890 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types
) {
891 FOR_EACH_OBSERVER(SyncEngineEventListener
,
892 *session_context_
->listeners(),
893 OnMigrationRequested(types
));
896 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled
) {
897 DCHECK(CalledOnValidThread());
898 session_context_
->set_notifications_enabled(notifications_enabled
);
899 if (notifications_enabled
)
900 nudge_tracker_
.OnInvalidationsEnabled();
902 nudge_tracker_
.OnInvalidationsDisabled();
905 base::TimeDelta
SyncSchedulerImpl::GetSessionsCommitDelay() const {
906 DCHECK(CalledOnValidThread());
907 return sessions_commit_delay_
;
918 } // namespace syncer