1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/notifier/sync_invalidation_listener.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h"
16 #include "jingle/notifier/listener/push_client.h"
17 #include "sync/notifier/invalidation_util.h"
18 #include "sync/notifier/object_id_invalidation_map.h"
19 #include "sync/notifier/registration_manager.h"
23 const char kApplicationName
[] = "chrome-sync";
29 SyncInvalidationListener::Delegate::~Delegate() {}
31 SyncInvalidationListener::SyncInvalidationListener(
32 scoped_ptr
<SyncNetworkChannel
> network_channel
)
33 : sync_network_channel_(network_channel
.Pass()),
34 sync_system_resources_(sync_network_channel_
.get(), this),
36 ticl_state_(DEFAULT_INVALIDATION_ERROR
),
37 push_client_state_(DEFAULT_INVALIDATION_ERROR
),
38 weak_ptr_factory_(this) {
39 DCHECK(CalledOnValidThread());
40 sync_network_channel_
->AddObserver(this);
43 SyncInvalidationListener::~SyncInvalidationListener() {
44 DCHECK(CalledOnValidThread());
45 sync_network_channel_
->RemoveObserver(this);
50 void SyncInvalidationListener::Start(
51 const CreateInvalidationClientCallback
&
52 create_invalidation_client_callback
,
53 const std::string
& client_id
, const std::string
& client_info
,
54 const std::string
& invalidation_bootstrap_data
,
55 const UnackedInvalidationsMap
& initial_unacked_invalidations
,
56 const WeakHandle
<InvalidationStateTracker
>& invalidation_state_tracker
,
58 DCHECK(CalledOnValidThread());
61 sync_system_resources_
.set_platform(client_info
);
62 sync_system_resources_
.Start();
64 // The Storage resource is implemented as a write-through cache. We populate
65 // it with the initial state on startup, so subsequent writes go to disk and
66 // update the in-memory cache, while reads just return the cached state.
67 sync_system_resources_
.storage()->SetInitialState(
68 invalidation_bootstrap_data
);
70 unacked_invalidations_map_
= initial_unacked_invalidations
;
71 invalidation_state_tracker_
= invalidation_state_tracker
;
72 DCHECK(invalidation_state_tracker_
.IsInitialized());
78 invalidation_client_
.reset(create_invalidation_client_callback
.Run(
79 &sync_system_resources_
,
80 sync_network_channel_
->GetInvalidationClientType(),
84 invalidation_client_
->Start();
86 registration_manager_
.reset(
87 new RegistrationManager(invalidation_client_
.get()));
90 void SyncInvalidationListener::UpdateCredentials(
91 const std::string
& email
, const std::string
& token
) {
92 DCHECK(CalledOnValidThread());
93 sync_network_channel_
->UpdateCredentials(email
, token
);
96 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet
& ids
) {
97 DCHECK(CalledOnValidThread());
98 registered_ids_
= ids
;
99 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
100 // working XMPP connection (as observed by us), so check it instead
101 // of GetState() (see http://crbug.com/139424).
102 if (ticl_state_
== INVALIDATIONS_ENABLED
&& registration_manager_
) {
103 DoRegistrationUpdate();
107 void SyncInvalidationListener::Ready(
108 invalidation::InvalidationClient
* client
) {
109 DCHECK(CalledOnValidThread());
110 DCHECK_EQ(client
, invalidation_client_
.get());
111 ticl_state_
= INVALIDATIONS_ENABLED
;
113 DoRegistrationUpdate();
116 void SyncInvalidationListener::Invalidate(
117 invalidation::InvalidationClient
* client
,
118 const invalidation::Invalidation
& invalidation
,
119 const invalidation::AckHandle
& ack_handle
) {
120 DCHECK(CalledOnValidThread());
121 DCHECK_EQ(client
, invalidation_client_
.get());
122 client
->Acknowledge(ack_handle
);
124 const invalidation::ObjectId
& id
= invalidation
.object_id();
127 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
128 if (invalidation
.has_payload())
129 payload
= invalidation
.payload();
131 DVLOG(2) << "Received invalidation with version " << invalidation
.version()
132 << " for " << ObjectIdToString(id
);
134 ObjectIdInvalidationMap invalidations
;
135 Invalidation inv
= Invalidation::Init(id
, invalidation
.version(), payload
);
136 inv
.set_ack_handler(GetThisAsAckHandler());
137 invalidations
.Insert(inv
);
139 DispatchInvalidations(invalidations
);
142 void SyncInvalidationListener::InvalidateUnknownVersion(
143 invalidation::InvalidationClient
* client
,
144 const invalidation::ObjectId
& object_id
,
145 const invalidation::AckHandle
& ack_handle
) {
146 DCHECK(CalledOnValidThread());
147 DCHECK_EQ(client
, invalidation_client_
.get());
148 DVLOG(1) << "InvalidateUnknownVersion";
149 client
->Acknowledge(ack_handle
);
151 ObjectIdInvalidationMap invalidations
;
152 Invalidation unknown_version
= Invalidation::InitUnknownVersion(object_id
);
153 unknown_version
.set_ack_handler(GetThisAsAckHandler());
154 invalidations
.Insert(unknown_version
);
156 DispatchInvalidations(invalidations
);
159 // This should behave as if we got an invalidation with version
160 // UNKNOWN_OBJECT_VERSION for all known data types.
161 void SyncInvalidationListener::InvalidateAll(
162 invalidation::InvalidationClient
* client
,
163 const invalidation::AckHandle
& ack_handle
) {
164 DCHECK(CalledOnValidThread());
165 DCHECK_EQ(client
, invalidation_client_
.get());
166 DVLOG(1) << "InvalidateAll";
167 client
->Acknowledge(ack_handle
);
169 ObjectIdInvalidationMap invalidations
;
170 for (ObjectIdSet::iterator it
= registered_ids_
.begin();
171 it
!= registered_ids_
.end(); ++it
) {
172 Invalidation unknown_version
= Invalidation::InitUnknownVersion(*it
);
173 unknown_version
.set_ack_handler(GetThisAsAckHandler());
174 invalidations
.Insert(unknown_version
);
177 DispatchInvalidations(invalidations
);
180 // If a handler is registered, emit right away. Otherwise, save it for later.
181 void SyncInvalidationListener::DispatchInvalidations(
182 const ObjectIdInvalidationMap
& invalidations
) {
183 DCHECK(CalledOnValidThread());
185 ObjectIdInvalidationMap to_save
= invalidations
;
186 ObjectIdInvalidationMap to_emit
=
187 invalidations
.GetSubsetWithObjectIds(registered_ids_
);
189 SaveInvalidations(to_save
);
190 EmitSavedInvalidations(to_emit
);
193 void SyncInvalidationListener::SaveInvalidations(
194 const ObjectIdInvalidationMap
& to_save
) {
195 ObjectIdSet objects_to_save
= to_save
.GetObjectIds();
196 for (ObjectIdSet::const_iterator it
= objects_to_save
.begin();
197 it
!= objects_to_save
.end(); ++it
) {
198 UnackedInvalidationsMap::iterator lookup
=
199 unacked_invalidations_map_
.find(*it
);
200 if (lookup
== unacked_invalidations_map_
.end()) {
201 lookup
= unacked_invalidations_map_
.insert(
202 std::make_pair(*it
, UnackedInvalidationSet(*it
))).first
;
204 lookup
->second
.AddSet(to_save
.ForObject(*it
));
207 invalidation_state_tracker_
.Call(
209 &InvalidationStateTracker::SetSavedInvalidations
,
210 unacked_invalidations_map_
);
213 void SyncInvalidationListener::EmitSavedInvalidations(
214 const ObjectIdInvalidationMap
& to_emit
) {
215 DVLOG(2) << "Emitting invalidations: " << to_emit
.ToString();
216 delegate_
->OnInvalidate(to_emit
);
219 void SyncInvalidationListener::InformRegistrationStatus(
220 invalidation::InvalidationClient
* client
,
221 const invalidation::ObjectId
& object_id
,
222 InvalidationListener::RegistrationState new_state
) {
223 DCHECK(CalledOnValidThread());
224 DCHECK_EQ(client
, invalidation_client_
.get());
225 DVLOG(1) << "InformRegistrationStatus: "
226 << ObjectIdToString(object_id
) << " " << new_state
;
228 if (new_state
!= InvalidationListener::REGISTERED
) {
229 // Let |registration_manager_| handle the registration backoff policy.
230 registration_manager_
->MarkRegistrationLost(object_id
);
234 void SyncInvalidationListener::InformRegistrationFailure(
235 invalidation::InvalidationClient
* client
,
236 const invalidation::ObjectId
& object_id
,
238 const std::string
& error_message
) {
239 DCHECK(CalledOnValidThread());
240 DCHECK_EQ(client
, invalidation_client_
.get());
241 DVLOG(1) << "InformRegistrationFailure: "
242 << ObjectIdToString(object_id
)
243 << "is_transient=" << is_transient
244 << ", message=" << error_message
;
247 // We don't care about |unknown_hint|; we let
248 // |registration_manager_| handle the registration backoff policy.
249 registration_manager_
->MarkRegistrationLost(object_id
);
251 // Non-transient failures require an action to resolve. This could happen
253 // - the server doesn't yet recognize the data type, which could happen for
254 // brand-new data types.
255 // - the user has changed his password and hasn't updated it yet locally.
256 // Either way, block future registration attempts for |object_id|. However,
257 // we don't forget any saved invalidation state since we may use it once the
258 // error is addressed.
259 registration_manager_
->DisableId(object_id
);
263 void SyncInvalidationListener::ReissueRegistrations(
264 invalidation::InvalidationClient
* client
,
265 const std::string
& prefix
,
267 DCHECK(CalledOnValidThread());
268 DCHECK_EQ(client
, invalidation_client_
.get());
269 DVLOG(1) << "AllRegistrationsLost";
270 registration_manager_
->MarkAllRegistrationsLost();
273 void SyncInvalidationListener::InformError(
274 invalidation::InvalidationClient
* client
,
275 const invalidation::ErrorInfo
& error_info
) {
276 DCHECK(CalledOnValidThread());
277 DCHECK_EQ(client
, invalidation_client_
.get());
278 LOG(ERROR
) << "Ticl error " << error_info
.error_reason() << ": "
279 << error_info
.error_message()
280 << " (transient = " << error_info
.is_transient() << ")";
281 if (error_info
.error_reason() == invalidation::ErrorReason::AUTH_FAILURE
) {
282 ticl_state_
= INVALIDATION_CREDENTIALS_REJECTED
;
284 ticl_state_
= TRANSIENT_INVALIDATION_ERROR
;
289 void SyncInvalidationListener::Acknowledge(
290 const invalidation::ObjectId
& id
,
291 const syncer::AckHandle
& handle
) {
292 UnackedInvalidationsMap::iterator lookup
=
293 unacked_invalidations_map_
.find(id
);
294 if (lookup
== unacked_invalidations_map_
.end()) {
295 DLOG(WARNING
) << "Received acknowledgement for untracked object ID";
298 lookup
->second
.Acknowledge(handle
);
299 invalidation_state_tracker_
.Call(
301 &InvalidationStateTracker::SetSavedInvalidations
,
302 unacked_invalidations_map_
);
305 void SyncInvalidationListener::Drop(
306 const invalidation::ObjectId
& id
,
307 const syncer::AckHandle
& handle
) {
308 UnackedInvalidationsMap::iterator lookup
=
309 unacked_invalidations_map_
.find(id
);
310 if (lookup
== unacked_invalidations_map_
.end()) {
311 DLOG(WARNING
) << "Received drop for untracked object ID";
314 lookup
->second
.Drop(handle
);
315 invalidation_state_tracker_
.Call(
317 &InvalidationStateTracker::SetSavedInvalidations
,
318 unacked_invalidations_map_
);
321 void SyncInvalidationListener::WriteState(const std::string
& state
) {
322 DCHECK(CalledOnValidThread());
323 DVLOG(1) << "WriteState";
324 invalidation_state_tracker_
.Call(
325 FROM_HERE
, &InvalidationStateTracker::SetBootstrapData
, state
);
328 void SyncInvalidationListener::DoRegistrationUpdate() {
329 DCHECK(CalledOnValidThread());
330 const ObjectIdSet
& unregistered_ids
=
331 registration_manager_
->UpdateRegisteredIds(registered_ids_
);
332 for (ObjectIdSet::iterator it
= unregistered_ids
.begin();
333 it
!= unregistered_ids
.end(); ++it
) {
334 unacked_invalidations_map_
.erase(*it
);
336 invalidation_state_tracker_
.Call(
338 &InvalidationStateTracker::SetSavedInvalidations
,
339 unacked_invalidations_map_
);
341 ObjectIdInvalidationMap object_id_invalidation_map
;
342 for (UnackedInvalidationsMap::iterator map_it
=
343 unacked_invalidations_map_
.begin();
344 map_it
!= unacked_invalidations_map_
.end(); ++map_it
) {
345 if (registered_ids_
.find(map_it
->first
) == registered_ids_
.end()) {
348 map_it
->second
.ExportInvalidations(
349 GetThisAsAckHandler(),
350 &object_id_invalidation_map
);
353 // There's no need to run these through DispatchInvalidations(); they've
354 // already been saved to storage (that's where we found them) so all we need
355 // to do now is emit them.
356 EmitSavedInvalidations(object_id_invalidation_map
);
359 void SyncInvalidationListener::RequestDetailedStatus(
360 base::Callback
<void(const base::DictionaryValue
&)> callback
) const {
361 DCHECK(CalledOnValidThread());
362 sync_network_channel_
->RequestDetailedStatus(callback
);
363 callback
.Run(*CollectDebugData());
366 scoped_ptr
<base::DictionaryValue
>
367 SyncInvalidationListener::CollectDebugData() const {
368 scoped_ptr
<base::DictionaryValue
> return_value(new base::DictionaryValue());
369 return_value
->SetString(
370 "SyncInvalidationListener.PushClientState",
371 std::string(InvalidatorStateToString(push_client_state_
)));
372 return_value
->SetString("SyncInvalidationListener.TiclState",
373 std::string(InvalidatorStateToString(ticl_state_
)));
374 scoped_ptr
<base::DictionaryValue
> unacked_map(new base::DictionaryValue());
375 for (UnackedInvalidationsMap::const_iterator it
=
376 unacked_invalidations_map_
.begin();
377 it
!= unacked_invalidations_map_
.end();
379 unacked_map
->Set((it
->first
).name(), (it
->second
).ToValue().release());
381 return_value
->Set("SyncInvalidationListener.UnackedInvalidationsMap",
382 unacked_map
.release());
383 return return_value
.Pass();
386 void SyncInvalidationListener::StopForTest() {
387 DCHECK(CalledOnValidThread());
391 void SyncInvalidationListener::Stop() {
392 DCHECK(CalledOnValidThread());
393 if (!invalidation_client_
) {
397 registration_manager_
.reset();
398 sync_system_resources_
.Stop();
399 invalidation_client_
->Stop();
401 invalidation_client_
.reset();
404 ticl_state_
= DEFAULT_INVALIDATION_ERROR
;
405 push_client_state_
= DEFAULT_INVALIDATION_ERROR
;
408 InvalidatorState
SyncInvalidationListener::GetState() const {
409 DCHECK(CalledOnValidThread());
410 if (ticl_state_
== INVALIDATION_CREDENTIALS_REJECTED
||
411 push_client_state_
== INVALIDATION_CREDENTIALS_REJECTED
) {
412 // If either the ticl or the push client rejected our credentials,
413 // return INVALIDATION_CREDENTIALS_REJECTED.
414 return INVALIDATION_CREDENTIALS_REJECTED
;
416 if (ticl_state_
== INVALIDATIONS_ENABLED
&&
417 push_client_state_
== INVALIDATIONS_ENABLED
) {
418 // If the ticl is ready and the push client notifications are
419 // enabled, return INVALIDATIONS_ENABLED.
420 return INVALIDATIONS_ENABLED
;
422 // Otherwise, we have a transient error.
423 return TRANSIENT_INVALIDATION_ERROR
;
426 void SyncInvalidationListener::EmitStateChange() {
427 DCHECK(CalledOnValidThread());
428 delegate_
->OnInvalidatorStateChange(GetState());
431 WeakHandle
<AckHandler
> SyncInvalidationListener::GetThisAsAckHandler() {
432 DCHECK(CalledOnValidThread());
433 return WeakHandle
<AckHandler
>(weak_ptr_factory_
.GetWeakPtr());
436 void SyncInvalidationListener::OnNetworkChannelStateChanged(
437 InvalidatorState invalidator_state
) {
438 DCHECK(CalledOnValidThread());
439 push_client_state_
= invalidator_state
;
443 } // namespace syncer