Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / sync / engine / model_type_worker.cc
blob7e67dc87278410dff83e7e83bfd1e31c4c47877e
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sync/engine/model_type_worker.h"
7 #include "base/bind.h"
8 #include "base/format_macros.h"
9 #include "base/guid.h"
10 #include "base/logging.h"
11 #include "base/strings/stringprintf.h"
12 #include "sync/engine/commit_contribution.h"
13 #include "sync/engine/entity_tracker.h"
14 #include "sync/engine/model_type_processor.h"
15 #include "sync/engine/non_blocking_type_commit_contribution.h"
16 #include "sync/syncable/syncable_util.h"
17 #include "sync/util/cryptographer.h"
18 #include "sync/util/time.h"
20 namespace syncer_v2 {
22 using syncer::CommitContribution;
23 using syncer::Cryptographer;
24 using syncer::ModelType;
25 using syncer::NudgeHandler;
26 using syncer::SyncerError;
28 ModelTypeWorker::ModelTypeWorker(
29 ModelType type,
30 const DataTypeState& initial_state,
31 const UpdateResponseDataList& saved_pending_updates,
32 scoped_ptr<Cryptographer> cryptographer,
33 NudgeHandler* nudge_handler,
34 scoped_ptr<ModelTypeProcessor> model_type_processor)
35 : type_(type),
36 data_type_state_(initial_state),
37 model_type_processor_(model_type_processor.Pass()),
38 cryptographer_(cryptographer.Pass()),
39 nudge_handler_(nudge_handler),
40 weak_ptr_factory_(this) {
41 // Request an initial sync if it hasn't been completed yet.
42 if (!data_type_state_.initial_sync_done) {
43 nudge_handler_->NudgeForInitialDownload(type_);
46 for (UpdateResponseDataList::const_iterator it =
47 saved_pending_updates.begin();
48 it != saved_pending_updates.end(); ++it) {
49 scoped_ptr<EntityTracker> entity_tracker =
50 EntityTracker::FromUpdateResponse(*it);
51 entity_tracker->ReceivePendingUpdate(*it);
52 entities_.insert(it->client_tag_hash, entity_tracker.Pass());
55 if (cryptographer_) {
56 DVLOG(1) << ModelTypeToString(type_) << ": Starting with encryption key "
57 << cryptographer_->GetDefaultNigoriKeyName();
58 OnCryptographerUpdated();
62 ModelTypeWorker::~ModelTypeWorker() {}
64 ModelType ModelTypeWorker::GetModelType() const {
65 DCHECK(CalledOnValidThread());
66 return type_;
69 bool ModelTypeWorker::IsEncryptionRequired() const {
70 return !!cryptographer_;
73 void ModelTypeWorker::UpdateCryptographer(
74 scoped_ptr<Cryptographer> cryptographer) {
75 DCHECK(cryptographer);
76 cryptographer_ = cryptographer.Pass();
78 // Update our state and that of the proxy.
79 OnCryptographerUpdated();
81 // Nudge the scheduler if we're now allowed to commit.
82 if (CanCommitItems())
83 nudge_handler_->NudgeForCommit(type_);
86 // UpdateHandler implementation.
87 void ModelTypeWorker::GetDownloadProgress(
88 sync_pb::DataTypeProgressMarker* progress_marker) const {
89 DCHECK(CalledOnValidThread());
90 progress_marker->CopyFrom(data_type_state_.progress_marker);
93 void ModelTypeWorker::GetDataTypeContext(
94 sync_pb::DataTypeContext* context) const {
95 DCHECK(CalledOnValidThread());
96 context->CopyFrom(data_type_state_.type_context);
99 SyncerError ModelTypeWorker::ProcessGetUpdatesResponse(
100 const sync_pb::DataTypeProgressMarker& progress_marker,
101 const sync_pb::DataTypeContext& mutated_context,
102 const SyncEntityList& applicable_updates,
103 syncer::sessions::StatusController* status) {
104 DCHECK(CalledOnValidThread());
106 // TODO(rlarocque): Handle data type context conflicts.
107 data_type_state_.type_context = mutated_context;
108 data_type_state_.progress_marker = progress_marker;
110 UpdateResponseDataList response_datas;
111 UpdateResponseDataList pending_updates;
113 for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
114 update_it != applicable_updates.end(); ++update_it) {
115 const sync_pb::SyncEntity* update_entity = *update_it;
117 // Skip updates for permanent folders.
118 // TODO(stanisc): crbug.com/516866: might need to handle this for
119 // hierarchical datatypes.
120 if (!update_entity->server_defined_unique_tag().empty())
121 continue;
123 // Normal updates are handled here.
124 const std::string& client_tag_hash =
125 update_entity->client_defined_unique_tag();
127 // TODO(stanisc): crbug.com/516866: this wouldn't be true for bookmarks.
128 DCHECK(!client_tag_hash.empty());
130 // Prepare the message for the model thread.
131 UpdateResponseData response_data;
132 response_data.id = update_entity->id_string();
133 response_data.client_tag_hash = client_tag_hash;
134 response_data.response_version = update_entity->version();
135 response_data.ctime = syncer::ProtoTimeToTime(update_entity->ctime());
136 response_data.mtime = syncer::ProtoTimeToTime(update_entity->mtime());
137 response_data.non_unique_name = update_entity->name();
138 response_data.deleted = update_entity->deleted();
140 EntityTracker* entity_tracker = nullptr;
141 EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
142 if (map_it == entities_.end()) {
143 scoped_ptr<EntityTracker> scoped_entity_tracker =
144 EntityTracker::FromUpdateResponse(response_data);
145 entity_tracker = scoped_entity_tracker.get();
146 entities_.insert(client_tag_hash, scoped_entity_tracker.Pass());
147 } else {
148 entity_tracker = map_it->second;
151 const sync_pb::EntitySpecifics& specifics = update_entity->specifics();
153 if (!specifics.has_encrypted()) {
154 // No encryption.
155 entity_tracker->ReceiveUpdate(update_entity->version());
156 response_data.specifics = specifics;
157 response_datas.push_back(response_data);
158 } else if (specifics.has_encrypted() && cryptographer_ &&
159 cryptographer_->CanDecrypt(specifics.encrypted())) {
160 // Encrypted, but we know the key.
161 if (DecryptSpecifics(cryptographer_.get(), specifics,
162 &response_data.specifics)) {
163 entity_tracker->ReceiveUpdate(update_entity->version());
164 response_data.encryption_key_name = specifics.encrypted().key_name();
165 response_datas.push_back(response_data);
167 } else if (specifics.has_encrypted() &&
168 (!cryptographer_ ||
169 !cryptographer_->CanDecrypt(specifics.encrypted()))) {
170 // Can't decrypt right now. Ask the entity tracker to handle it.
171 response_data.specifics = specifics;
172 if (entity_tracker->ReceivePendingUpdate(response_data)) {
173 // Send to the model thread for safe-keeping across restarts if the
174 // tracker decides the update is worth keeping.
175 pending_updates.push_back(response_data);
180 DVLOG(1) << ModelTypeToString(type_) << ": "
181 << base::StringPrintf(
182 "Delivering %zd applicable and %zd pending updates.",
183 response_datas.size(), pending_updates.size());
185 // Forward these updates to the model thread so it can do the rest.
186 model_type_processor_->OnUpdateReceived(data_type_state_, response_datas,
187 pending_updates);
189 return syncer::SYNCER_OK;
192 void ModelTypeWorker::ApplyUpdates(syncer::sessions::StatusController* status) {
193 DCHECK(CalledOnValidThread());
194 // This function is called only when we've finished a download cycle, ie. we
195 // got a response with changes_remaining == 0. If this is our first download
196 // cycle, we should update our state so the ModelTypeProcessor knows that
197 // it's safe to commit items now.
198 if (!data_type_state_.initial_sync_done) {
199 DVLOG(1) << "Delivering 'initial sync done' ping.";
201 data_type_state_.initial_sync_done = true;
203 model_type_processor_->OnUpdateReceived(
204 data_type_state_, UpdateResponseDataList(), UpdateResponseDataList());
208 void ModelTypeWorker::PassiveApplyUpdates(
209 syncer::sessions::StatusController* status) {
210 NOTREACHED()
211 << "Non-blocking types should never apply updates on sync thread. "
212 << "ModelType is: " << ModelTypeToString(type_);
215 void ModelTypeWorker::EnqueueForCommit(const CommitRequestDataList& list) {
216 DCHECK(CalledOnValidThread());
218 DCHECK(IsTypeInitialized())
219 << "Asked to commit items before type was initialized. "
220 << "ModelType is: " << ModelTypeToString(type_);
222 for (CommitRequestDataList::const_iterator it = list.begin();
223 it != list.end(); ++it) {
224 StorePendingCommit(*it);
227 if (CanCommitItems())
228 nudge_handler_->NudgeForCommit(type_);
231 // CommitContributor implementation.
232 scoped_ptr<CommitContribution> ModelTypeWorker::GetContribution(
233 size_t max_entries) {
234 DCHECK(CalledOnValidThread());
236 size_t space_remaining = max_entries;
237 std::vector<int64> sequence_numbers;
238 google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
240 if (!CanCommitItems())
241 return scoped_ptr<CommitContribution>();
243 // TODO(rlarocque): Avoid iterating here.
244 for (EntityMap::const_iterator it = entities_.begin();
245 it != entities_.end() && space_remaining > 0; ++it) {
246 EntityTracker* entity = it->second;
247 if (entity->HasPendingCommit()) {
248 sync_pb::SyncEntity* commit_entity = commit_entities.Add();
249 int64 sequence_number = -1;
251 entity->PrepareCommitProto(commit_entity, &sequence_number);
252 HelpInitializeCommitEntity(commit_entity);
253 sequence_numbers.push_back(sequence_number);
255 space_remaining--;
259 if (commit_entities.size() == 0)
260 return scoped_ptr<CommitContribution>();
262 return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
263 data_type_state_.type_context, commit_entities, sequence_numbers, this));
266 void ModelTypeWorker::StorePendingCommit(const CommitRequestData& request) {
267 if (!request.deleted) {
268 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(request.specifics));
271 EntityTracker* entity;
272 EntityMap::const_iterator map_it = entities_.find(request.client_tag_hash);
273 if (map_it == entities_.end()) {
274 scoped_ptr<EntityTracker> scoped_entity =
275 EntityTracker::FromCommitRequest(request);
276 entity = scoped_entity.get();
277 entities_.insert(request.client_tag_hash, scoped_entity.Pass());
278 } else {
279 entity = map_it->second;
281 entity->RequestCommit(request);
284 void ModelTypeWorker::OnCommitResponse(
285 const CommitResponseDataList& response_list) {
286 for (CommitResponseDataList::const_iterator response_it =
287 response_list.begin();
288 response_it != response_list.end(); ++response_it) {
289 const std::string client_tag_hash = response_it->client_tag_hash;
290 EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
292 // There's no way we could have committed an entry we know nothing about.
293 if (map_it == entities_.end()) {
294 NOTREACHED() << "Received commit response for item unknown to us."
295 << " Model type: " << ModelTypeToString(type_)
296 << " ID: " << response_it->id;
297 continue;
300 EntityTracker* entity = map_it->second;
301 entity->ReceiveCommitResponse(response_it->id,
302 response_it->response_version,
303 response_it->sequence_number);
306 // Send the responses back to the model thread. It needs to know which
307 // items have been successfully committed so it can save that information in
308 // permanent storage.
309 model_type_processor_->OnCommitCompleted(data_type_state_, response_list);
312 base::WeakPtr<ModelTypeWorker> ModelTypeWorker::AsWeakPtr() {
313 return weak_ptr_factory_.GetWeakPtr();
316 bool ModelTypeWorker::IsTypeInitialized() const {
317 return data_type_state_.initial_sync_done &&
318 !data_type_state_.progress_marker.token().empty();
321 bool ModelTypeWorker::CanCommitItems() const {
322 // We can't commit anything until we know the type's parent node.
323 // We'll get it in the first update response.
324 if (!IsTypeInitialized())
325 return false;
327 // Don't commit if we should be encrypting but don't have the required keys.
328 if (IsEncryptionRequired() &&
329 (!cryptographer_ || !cryptographer_->is_ready())) {
330 return false;
333 return true;
336 void ModelTypeWorker::HelpInitializeCommitEntity(
337 sync_pb::SyncEntity* sync_entity) {
338 DCHECK(CanCommitItems());
340 // Initial commits need our help to generate a client ID.
341 if (!sync_entity->has_id_string()) {
342 DCHECK_EQ(kUncommittedVersion, sync_entity->version());
343 // TODO(stanisc): This is incorrect for bookmarks for two reasons:
344 // 1) Won't be able to match previously committed bookmarks to the ones
345 // with server ID.
346 // 2) Recommitting an item in a case of failing to receive commit response
347 // would result in generating a different client ID, which in turn
348 // would result in a duplication.
349 // We should generate client ID on the frontend side instead.
350 sync_entity->set_id_string(base::GenerateGUID());
353 // Encrypt the specifics and hide the title if necessary.
354 if (IsEncryptionRequired()) {
355 // IsEncryptionRequired() && CanCommitItems() implies
356 // that the cryptographer is valid and ready to encrypt.
357 sync_pb::EntitySpecifics encrypted_specifics;
358 bool result = cryptographer_->Encrypt(
359 sync_entity->specifics(), encrypted_specifics.mutable_encrypted());
360 DCHECK(result);
361 sync_entity->mutable_specifics()->CopyFrom(encrypted_specifics);
362 sync_entity->set_name("encrypted");
365 // Always include enough specifics to identify the type. Do this even in
366 // deletion requests, where the specifics are otherwise invalid.
367 AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
369 // TODO(stanisc): crbug.com/516866:
370 // Call sync_entity->set_parent_id_string(...) for hierarchical entities here.
373 void ModelTypeWorker::OnCryptographerUpdated() {
374 DCHECK(cryptographer_);
376 bool new_encryption_key = false;
377 UpdateResponseDataList response_datas;
379 const std::string& new_key_name = cryptographer_->GetDefaultNigoriKeyName();
381 // Handle a change in encryption key.
382 if (data_type_state_.encryption_key_name != new_key_name) {
383 DVLOG(1) << ModelTypeToString(type_) << ": Updating encryption key "
384 << data_type_state_.encryption_key_name << " -> " << new_key_name;
385 data_type_state_.encryption_key_name = new_key_name;
386 new_encryption_key = true;
389 for (EntityMap::const_iterator it = entities_.begin(); it != entities_.end();
390 ++it) {
391 if (it->second->HasPendingUpdate()) {
392 const UpdateResponseData& saved_pending = it->second->GetPendingUpdate();
394 // We assume all pending updates are encrypted items for which we
395 // don't have the key.
396 DCHECK(saved_pending.specifics.has_encrypted());
398 if (cryptographer_->CanDecrypt(saved_pending.specifics.encrypted())) {
399 UpdateResponseData decrypted_response = saved_pending;
400 if (DecryptSpecifics(cryptographer_.get(), saved_pending.specifics,
401 &decrypted_response.specifics)) {
402 decrypted_response.encryption_key_name =
403 saved_pending.specifics.encrypted().key_name();
404 response_datas.push_back(decrypted_response);
406 it->second->ClearPendingUpdate();
412 if (new_encryption_key || response_datas.size() > 0) {
413 DVLOG(1) << ModelTypeToString(type_) << ": "
414 << base::StringPrintf(
415 "Delivering encryption key and %zd decrypted updates.",
416 response_datas.size());
417 model_type_processor_->OnUpdateReceived(data_type_state_, response_datas,
418 UpdateResponseDataList());
422 bool ModelTypeWorker::DecryptSpecifics(Cryptographer* cryptographer,
423 const sync_pb::EntitySpecifics& in,
424 sync_pb::EntitySpecifics* out) {
425 DCHECK(in.has_encrypted());
426 DCHECK(cryptographer->CanDecrypt(in.encrypted()));
428 std::string plaintext;
429 plaintext = cryptographer->DecryptToString(in.encrypted());
430 if (plaintext.empty()) {
431 LOG(ERROR) << "Failed to decrypt a decryptable entity";
432 return false;
434 if (!out->ParseFromString(plaintext)) {
435 LOG(ERROR) << "Failed to parse decrypted entity";
436 return false;
438 return true;
441 } // namespace syncer