1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 this.EXPORTED_SYMBOLS = [
13 const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
15 Cu.import("resource://services-common/async.js");
16 Cu.import("resource://gre/modules/Log.jsm");
17 Cu.import("resource://services-common/observers.js");
18 Cu.import("resource://services-common/utils.js");
19 Cu.import("resource://services-sync/constants.js");
20 Cu.import("resource://services-sync/identity.js");
21 Cu.import("resource://services-sync/record.js");
22 Cu.import("resource://services-sync/resource.js");
23 Cu.import("resource://services-sync/util.js");
26 * Trackers are associated with a single engine and deal with
27 * listening for changes to their particular data type.
29 * There are two things they keep track of:
30 * 1) A score, indicating how urgently the engine wants to sync
31 * 2) A list of IDs for all the changed items that need to be synced
32 * and updating their 'score', indicating how urgently they
36 this.Tracker = function Tracker(name, engine) {
38 throw new Error("Tracker must be associated with an Engine instance.");
41 name = name || "Unnamed";
42 this.name = this.file = name.toLowerCase();
45 this._log = Log.repository.getLogger("Sync.Tracker." + name);
46 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
47 this._log.level = Log.Level[level];
51 this.ignoreAll = false;
53 this.loadChangedIDs();
55 Svc.Obs.add("weave:engine:start-tracking", this);
56 Svc.Obs.add("weave:engine:stop-tracking", this);
61 * Score can be called as often as desired to decide which engines to sync
63 * Valid values for score:
64 * -1: Do not sync unless the user specifically requests it (almost disabled)
65 * 0: Nothing has changed
66 * 100: Please sync me ASAP!
68 * Setting it to other values should (but doesn't currently) throw an exception
76 Observers.notify("weave:engine:score:updated", this.name);
79 // Should be called by service everytime a sync has been done for an engine
80 resetScore: function () {
84 persistChangedIDs: true,
87 * Persist changedIDs to disk at a later date.
88 * Optionally pass a callback to be invoked when the write has occurred.
90 saveChangedIDs: function (cb) {
91 if (!this.persistChangedIDs) {
92 this._log.debug("Not saving changedIDs.");
95 Utils.namedTimer(function () {
96 this._log.debug("Saving changed IDs to " + this.file);
97 Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb);
98 }, 1000, this, "_lazySave");
101 loadChangedIDs: function (cb) {
102 Utils.jsonLoad("changes/" + this.file, this, function(json) {
103 if (json && (typeof(json) == "object")) {
104 this.changedIDs = json;
106 this._log.warn("Changed IDs file " + this.file + " contains non-object value.");
115 // ignore/unignore specific IDs. Useful for ignoring items that are
116 // being processed, or that shouldn't be synced.
117 // But note: not persisted to disk
119 ignoreID: function (id) {
121 this._ignored.push(id);
124 unignoreID: function (id) {
125 let index = this._ignored.indexOf(id);
127 this._ignored.splice(index, 1);
130 addChangedID: function (id, when) {
132 this._log.warn("Attempted to add undefined ID to tracker");
136 if (this.ignoreAll || (id in this._ignored)) {
140 // Default to the current time in seconds if no time is provided.
142 when = Math.floor(Date.now() / 1000);
145 // Add/update the entry if we have a newer time.
146 if ((this.changedIDs[id] || -Infinity) < when) {
147 this._log.trace("Adding changed ID: " + id + ", " + when);
148 this.changedIDs[id] = when;
149 this.saveChangedIDs(this.onSavedChangedIDs);
155 removeChangedID: function (id) {
157 this._log.warn("Attempted to remove undefined ID to tracker");
160 if (this.ignoreAll || (id in this._ignored))
162 if (this.changedIDs[id] != null) {
163 this._log.trace("Removing changed ID " + id);
164 delete this.changedIDs[id];
165 this.saveChangedIDs();
170 clearChangedIDs: function () {
171 this._log.trace("Clearing changed ID list");
172 this.changedIDs = {};
173 this.saveChangedIDs();
178 // Override these in your subclasses.
179 startTracking: function () {
182 stopTracking: function () {
185 engineIsEnabled: function () {
187 // Can't tell -- we must be running in a test!
190 return this.engine.enabled;
193 onEngineEnabledChanged: function (engineEnabled) {
194 if (engineEnabled == this._isTracking) {
199 this.startTracking();
200 this._isTracking = true;
203 this._isTracking = false;
204 this.clearChangedIDs();
208 observe: function (subject, topic, data) {
210 case "weave:engine:start-tracking":
211 if (!this.engineIsEnabled()) {
214 this._log.trace("Got start-tracking.");
215 if (!this._isTracking) {
216 this.startTracking();
217 this._isTracking = true;
220 case "weave:engine:stop-tracking":
221 this._log.trace("Got stop-tracking.");
222 if (this._isTracking) {
224 this._isTracking = false;
234 * The Store serves as the interface between Sync and stored data.
236 * The name "store" is slightly a misnomer because it doesn't actually "store"
237 * anything. Instead, it serves as a gateway to something that actually does
240 * The store is responsible for record management inside an engine. It tells
241 * Sync what items are available for Sync, converts items to and from Sync's
242 * record format, and applies records from Sync into changes on the underlying
245 * Store implementations require a number of functions to be implemented. These
246 * are all documented below.
248 * For stores that deal with many records or which have expensive store access
249 * routines, it is highly recommended to implement a custom applyIncomingBatch
250 * and/or applyIncoming function on top of the basic APIs.
253 this.Store = function Store(name, engine) {
255 throw new Error("Store must be associated with an Engine instance.");
258 name = name || "Unnamed";
259 this.name = name.toLowerCase();
260 this.engine = engine;
262 this._log = Log.repository.getLogger("Sync.Store." + name);
263 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
264 this._log.level = Log.Level[level];
266 XPCOMUtils.defineLazyGetter(this, "_timer", function() {
267 return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
272 _sleep: function _sleep(delay) {
273 let cb = Async.makeSyncCallback();
274 this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
275 Async.waitForSyncCallback(cb);
279 * Apply multiple incoming records against the store.
281 * This is called with a set of incoming records to process. The function
282 * should look at each record, reconcile with the current local state, and
283 * make the local changes required to bring its state in alignment with the
286 * The default implementation simply iterates over all records and calls
287 * applyIncoming(). Store implementations may overwrite this function
290 * @param records Array of records to apply
291 * @return Array of record IDs which did not apply cleanly
293 applyIncomingBatch: function (records) {
295 for each (let record in records) {
297 this.applyIncoming(record);
298 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
299 // This kind of exception should have a 'cause' attribute, which is an
300 // originating exception.
301 // ex.cause will carry its stack with it when rethrown.
304 this._log.warn("Failed to apply incoming record " + record.id);
305 this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
306 failed.push(record.id);
313 * Apply a single record against the store.
315 * This takes a single record and makes the local changes required so the
316 * local state matches what's in the record.
318 * The default implementation calls one of remove(), create(), or update()
319 * depending on the state obtained from the store itself. Store
320 * implementations may overwrite this function if desired.
325 applyIncoming: function (record) {
328 else if (!this.itemExists(record.id))
334 // override these in derived objects
337 * Create an item in the store from a record.
339 * This is called by the default implementation of applyIncoming(). If using
340 * applyIncomingBatch(), this won't be called unless your store calls it.
343 * The store record to create an item from
345 create: function (record) {
346 throw "override create in a subclass";
350 * Remove an item in the store from a record.
352 * This is called by the default implementation of applyIncoming(). If using
353 * applyIncomingBatch(), this won't be called unless your store calls it.
356 * The store record to delete an item from
358 remove: function (record) {
359 throw "override remove in a subclass";
363 * Update an item from a record.
365 * This is called by the default implementation of applyIncoming(). If using
366 * applyIncomingBatch(), this won't be called unless your store calls it.
369 * The record to use to update an item from
371 update: function (record) {
372 throw "override update in a subclass";
376 * Determine whether a record with the specified ID exists.
378 * Takes a string record ID and returns a booleans saying whether the record
383 * @return boolean indicating whether record exists locally
385 itemExists: function (id) {
386 throw "override itemExists in a subclass";
390 * Create a record from the specified ID.
392 * If the ID is known, the record should be populated with metadata from
393 * the store. If the ID is not known, the record should be created with the
394 * delete field set to true.
399 * Collection to add record to. This is typically passed into the
400 * constructor for the newly-created record.
401 * @return record type for this engine
403 createRecord: function (id, collection) {
404 throw "override createRecord in a subclass";
408 * Change the ID of a record.
411 * string old/current record ID
413 * string new record ID
415 changeItemID: function (oldID, newID) {
416 throw "override changeItemID in a subclass";
420 * Obtain the set of all known record IDs.
422 * @return Object with ID strings as keys and values of true. The values
425 getAllIDs: function () {
426 throw "override getAllIDs in a subclass";
430 * Wipe all data in the store.
432 * This function is called during remote wipes or when replacing local data
435 * This function should delete all local data that the store is managing. It
436 * can be thought of as clearing out all state and restoring the "new
440 throw "override wipe in a subclass";
444 this.EngineManager = function EngineManager(service) {
445 this.service = service;
449 // This will be populated by Service on startup.
450 this._declined = new Set();
451 this._log = Log.repository.getLogger("Sync.EngineManager");
452 this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")];
454 EngineManager.prototype = {
455 get: function (name) {
456 // Return an array of engines if we have an array of names
457 if (Array.isArray(name)) {
459 name.forEach(function(name) {
460 let engine = this.get(name);
462 engines.push(engine);
468 let engine = this._engines[name];
470 this._log.debug("Could not get engine: " + name);
472 this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines)));
478 getAll: function () {
479 return [engine for ([name, engine] in Iterator(this._engines))];
483 * N.B., does not pay attention to the declined list.
485 getEnabled: function () {
486 return this.getAll().filter(function(engine) engine.enabled);
489 get enabledEngineNames() {
490 return [e.name for each (e in this.getEnabled())];
493 persistDeclined: function () {
494 Svc.Prefs.set("declinedEngines", [...this._declined].join(","));
500 getDeclined: function () {
501 return [...this._declined];
504 setDeclined: function (engines) {
505 this._declined = new Set(engines);
506 this.persistDeclined();
509 isDeclined: function (engineName) {
510 return this._declined.has(engineName);
514 * Accepts a Set or an array.
516 decline: function (engines) {
517 for (let e of engines) {
518 this._declined.add(e);
520 this.persistDeclined();
523 undecline: function (engines) {
524 for (let e of engines) {
525 this._declined.delete(e);
527 this.persistDeclined();
531 * Mark any non-enabled engines as declined.
533 * This is useful after initial customization during setup.
535 declineDisabled: function () {
536 for (let e of this.getAll()) {
538 this._log.debug("Declining disabled engine " + e.name);
539 this._declined.add(e.name);
542 this.persistDeclined();
546 * Register an Engine to the service. Alternatively, give an array of engine
547 * objects to register.
549 * @param engineObject
550 * Engine object used to get an instance of the engine
551 * @return The engine object if anything failed
553 register: function (engineObject) {
554 if (Array.isArray(engineObject)) {
555 return engineObject.map(this.register, this);
559 let engine = new engineObject(this.service);
560 let name = engine.name;
561 if (name in this._engines) {
562 this._log.error("Engine '" + name + "' is already registered!");
564 this._engines[name] = engine;
567 this._log.error(CommonUtils.exceptionStr(ex));
569 let mesg = ex.message ? ex.message : ex;
570 let name = engineObject || "";
571 name = name.prototype || "";
572 name = name.name || "";
574 let out = "Could not initialize engine '" + name + "': " + mesg;
575 this._log.error(out);
581 unregister: function (val) {
583 if (val instanceof Engine) {
586 delete this._engines[name];
590 for (let name in this._engines) {
591 delete this._engines[name];
596 this.Engine = function Engine(name, service) {
598 throw new Error("Engine must be associated with a Service instance.");
601 this.Name = name || "Unnamed";
602 this.name = name.toLowerCase();
603 this.service = service;
605 this._notify = Utils.notify("weave:engine:");
606 this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
607 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
608 this._log.level = Log.Level[level];
610 this._tracker; // initialize tracker to load previously changed IDs
611 this._log.debug("Engine initialized");
614 // _storeObj, and _trackerObj should to be overridden in subclasses
616 _trackerObj: Tracker,
619 // Signal to the engine that processing further records is pointless.
620 eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
622 get prefName() this.name,
624 return Svc.Prefs.get("engine." + this.prefName, false);
628 Svc.Prefs.set("engine." + this.prefName, !!val);
629 this._tracker.onEngineEnabledChanged(val);
632 get score() this._tracker.score,
635 let store = new this._storeObj(this.Name, this);
636 this.__defineGetter__("_store", function() store);
641 let tracker = new this._trackerObj(this.Name, this);
642 this.__defineGetter__("_tracker", function() tracker);
652 throw "engine does not implement _sync method";
655 this._notify("sync", this.name, this._sync)();
659 * Get rid of any local meta-data.
661 resetClient: function () {
662 if (!this._resetClient) {
663 throw "engine does not implement _resetClient method";
666 this._notify("reset-client", this.name, this._resetClient)();
669 _wipeClient: function () {
671 this._log.debug("Deleting all local data");
672 this._tracker.ignoreAll = true;
674 this._tracker.ignoreAll = false;
675 this._tracker.clearChangedIDs();
678 wipeClient: function () {
679 this._notify("wipe-client", this.name, this._wipeClient)();
683 this.SyncEngine = function SyncEngine(name, service) {
684 Engine.call(this, name || "SyncEngine", service);
687 this.loadPreviousFailed();
690 // Enumeration to define approaches to handling bad records.
691 // Attached to the constructor to allow use as a kind of static enumeration.
692 SyncEngine.kRecoveryStrategy = {
698 SyncEngine.prototype = {
699 __proto__: Engine.prototype,
700 _recordObj: CryptoWrapper,
703 // How many records to pull in a single sync. This is primarily to avoid very
704 // long first syncs against profiles with many history records.
707 // How many records to pull at one time when specifying IDs. This is to avoid
708 // URI length limitations.
709 guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
710 mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE,
712 // How many records to process in a single batch.
713 applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
715 get storageURL() this.service.storageURL,
717 get engineURL() this.storageURL + this.name,
719 get cryptoKeysURL() this.storageURL + "crypto/keys",
721 get metaURL() this.storageURL + "meta/global",
724 // Generate a random syncID if we don't have one
725 let syncID = Svc.Prefs.get(this.name + ".syncID", "");
726 return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
729 Svc.Prefs.set(this.name + ".syncID", value);
733 * lastSync is a timestamp in server time.
736 return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
738 set lastSync(value) {
739 // Reset the pref in-case it's a number instead of a string
740 Svc.Prefs.reset(this.name + ".lastSync");
741 // Store the value as a string to keep floating point precision
742 Svc.Prefs.set(this.name + ".lastSync", value.toString());
744 resetLastSync: function () {
745 this._log.debug("Resetting " + this.name + " last sync time");
746 Svc.Prefs.reset(this.name + ".lastSync");
747 Svc.Prefs.set(this.name + ".lastSync", "0");
748 this.lastSyncLocal = 0;
751 get toFetch() this._toFetch,
753 let cb = (error) => this._log.error(Utils.exceptionStr(error));
754 // Coerce the array to a string for more efficient comparison.
755 if (val + "" == this._toFetch) {
759 Utils.namedTimer(function () {
760 Utils.jsonSave("toFetch/" + this.name, this, val, cb);
761 }, 0, this, "_toFetchDelay");
764 loadToFetch: function () {
765 // Initialize to empty if there's no file.
767 Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
769 this._toFetch = toFetch;
774 get previousFailed() this._previousFailed,
775 set previousFailed(val) {
776 let cb = (error) => this._log.error(Utils.exceptionStr(error));
777 // Coerce the array to a string for more efficient comparison.
778 if (val + "" == this._previousFailed) {
781 this._previousFailed = val;
782 Utils.namedTimer(function () {
783 Utils.jsonSave("failed/" + this.name, this, val, cb);
784 }, 0, this, "_previousFailedDelay");
787 loadPreviousFailed: function () {
788 // Initialize to empty if there's no file
789 this._previousFailed = [];
790 Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
791 if (previousFailed) {
792 this._previousFailed = previousFailed;
798 * lastSyncLocal is a timestamp in local time.
800 get lastSyncLocal() {
801 return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
803 set lastSyncLocal(value) {
804 // Store as a string because pref can only store C longs as numbers.
805 Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString());
809 * Returns a mapping of IDs -> changed timestamp. Engine implementations
810 * can override this method to bypass the tracker for certain or all
813 getChangedIDs: function () {
814 return this._tracker.changedIDs;
817 // Create a new record using the store and add in crypto fields.
818 _createRecord: function (id) {
819 let record = this._store.createRecord(id, this.name);
821 record.collection = this.name;
825 // Any setup that needs to happen at the beginning of each sync.
826 _syncStartup: function () {
828 // Determine if we need to wipe on outdated versions
829 let metaGlobal = this.service.recordManager.get(this.metaURL);
830 let engines = metaGlobal.payload.engines || {};
831 let engineData = engines[this.name] || {};
833 let needsWipe = false;
835 // Assume missing versions are 0 and wipe the server
836 if ((engineData.version || 0) < this.version) {
837 this._log.debug("Old engine data: " + [engineData.version, this.version]);
839 // Prepare to clear the server and upload everything
843 // Set the newer version and newly generated syncID
844 engineData.version = this.version;
845 engineData.syncID = this.syncID;
847 // Put the new data back into meta/global and mark for upload
848 engines[this.name] = engineData;
849 metaGlobal.payload.engines = engines;
850 metaGlobal.changed = true;
852 // Don't sync this engine if the server has newer data
853 else if (engineData.version > this.version) {
854 let error = new String("New data: " + [engineData.version, this.version]);
855 error.failureCode = VERSION_OUT_OF_DATE;
858 // Changes to syncID mean we'll need to upload everything
859 else if (engineData.syncID != this.syncID) {
860 this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
861 this.syncID = engineData.syncID;
865 // Delete any existing data and reupload on bad version or missing meta.
866 // No crypto component here...? We could regenerate per-collection keys...
871 // Save objects that need to be uploaded in this._modified. We also save
872 // the timestamp of this fetch in this.lastSyncLocal. As we successfully
873 // upload objects we remove them from this._modified. If an error occurs
874 // or any objects fail to upload, they will remain in this._modified. At
875 // the end of a sync, or after an error, we add all objects remaining in
876 // this._modified to the tracker.
877 this.lastSyncLocal = Date.now();
879 this._modified = this.getChangedIDs();
881 // Mark all items to be uploaded, but treat them as changed from long ago
882 this._log.debug("First sync, uploading all items");
884 for (let id in this._store.getAllIDs()) {
885 this._modified[id] = 0;
888 // Clear the tracker now. If the sync fails we'll add the ones we failed
890 this._tracker.clearChangedIDs();
892 this._log.info(Object.keys(this._modified).length +
893 " outgoing items pre-reconciliation");
895 // Keep track of what to delete at the end of sync
900 * A tiny abstraction to make it easier to test incoming record
903 _itemSource: function () {
904 return new Collection(this.engineURL, this._recordObj, this.service);
908 * Process incoming records.
909 * In the most awful and untestable way possible.
910 * This now accepts something that makes testing vaguely less impossible.
912 _processIncoming: function (newitems) {
913 this._log.trace("Downloading & applying server changes");
915 // Figure out how many total items to fetch this sync; do less on mobile.
916 let batchSize = this.downloadLimit || Infinity;
917 let isMobile = (Svc.Prefs.get("client.type") == "mobile");
920 newitems = this._itemSource();
924 batchSize = MOBILE_BATCH_SIZE;
926 newitems.newer = this.lastSync;
927 newitems.full = true;
928 newitems.limit = batchSize;
930 // applied => number of items that should be applied.
931 // failed => number of items that failed in this sync.
932 // newFailed => number of items that failed for the first time in this sync.
933 // reconciled => number of items that were reconciled.
934 let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
938 let failedInPreviousSync = this.previousFailed;
939 let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
940 // Reset previousFailed for each sync since previously failed items may not fail again.
941 this.previousFailed = [];
943 // Used (via exceptions) to allow the record handler/reconciliation/etc.
944 // methods to signal that they would like processing of incoming records to
946 let aborting = undefined;
948 function doApplyBatch() {
949 this._tracker.ignoreAll = true;
951 failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
953 // Catch any error that escapes from applyIncomingBatch. At present
954 // those will all be abort events.
955 this._log.warn("Got exception " + Utils.exceptionStr(ex) +
956 ", aborting processIncoming.");
959 this._tracker.ignoreAll = false;
963 function doApplyBatchAndPersistFailed() {
964 // Apply remaining batch.
965 if (applyBatch.length) {
966 doApplyBatch.call(this);
968 // Persist failed items so we refetch them.
970 this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
971 count.failed += failed.length;
972 this._log.debug("Records that failed to apply: " + failed);
977 let key = this.service.collectionKeys.keyForCollection(this.name);
979 // Not binding this method to 'this' for performance reasons. It gets
980 // called for every incoming record.
983 newitems.recordHandler = function(item) {
988 // Grab a later last modified if possible
989 if (self.lastModified == null || item.modified > self.lastModified)
990 self.lastModified = item.modified;
992 // Track the collection for the WBO.
993 item.collection = self.name;
995 // Remember which records were processed
996 handled.push(item.id);
1001 } catch (ex if Utils.isHMACMismatch(ex)) {
1002 let strategy = self.handleHMACMismatch(item, true);
1003 if (strategy == SyncEngine.kRecoveryStrategy.retry) {
1004 // You only get one retry.
1006 // Try decrypting again, typically because we've got new keys.
1007 self._log.info("Trying decrypt again...");
1008 key = self.service.collectionKeys.keyForCollection(self.name);
1011 } catch (ex if Utils.isHMACMismatch(ex)) {
1012 strategy = self.handleHMACMismatch(item, false);
1018 // Retry succeeded! No further handling.
1020 case SyncEngine.kRecoveryStrategy.retry:
1021 self._log.debug("Ignoring second retry suggestion.");
1022 // Fall through to error case.
1023 case SyncEngine.kRecoveryStrategy.error:
1024 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1025 failed.push(item.id);
1027 case SyncEngine.kRecoveryStrategy.ignore:
1028 self._log.debug("Ignoring record " + item.id +
1029 " with bad HMAC: already handled.");
1034 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1035 failed.push(item.id);
1041 shouldApply = self._reconcile(item);
1042 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
1043 self._log.warn("Reconciliation failed: aborting incoming processing.");
1044 failed.push(item.id);
1045 aborting = ex.cause;
1047 self._log.warn("Failed to reconcile incoming record " + item.id);
1048 self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
1049 failed.push(item.id);
1055 applyBatch.push(item);
1058 self._log.trace("Skipping reconciled incoming item " + item.id);
1061 if (applyBatch.length == self.applyIncomingBatchSize) {
1062 doApplyBatch.call(self);
1064 self._store._sleep(0);
1067 // Only bother getting data from the server if there's new things
1068 if (this.lastModified == null || this.lastModified > this.lastSync) {
1069 let resp = newitems.get();
1070 doApplyBatchAndPersistFailed.call(this);
1071 if (!resp.success) {
1072 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1081 // Mobile: check if we got the maximum that we requested; get the rest if so.
1082 if (handled.length == newitems.limit) {
1083 let guidColl = new Collection(this.engineURL, null, this.service);
1085 // Sort and limit so that on mobile we only get the last X records.
1086 guidColl.limit = this.downloadLimit;
1087 guidColl.newer = this.lastSync;
1089 // index: Orders by the sortindex descending (highest weight first).
1090 guidColl.sort = "index";
1092 let guids = guidColl.get();
1096 // Figure out which guids weren't just fetched then remove any guids that
1097 // were already waiting and prepend the new ones
1098 let extra = Utils.arraySub(guids.obj, handled);
1099 if (extra.length > 0) {
1100 fetchBatch = Utils.arrayUnion(extra, fetchBatch);
1101 this.toFetch = Utils.arrayUnion(extra, this.toFetch);
1105 // Fast-foward the lastSync timestamp since we have stored the
1106 // remaining items in toFetch.
1107 if (this.lastSync < this.lastModified) {
1108 this.lastSync = this.lastModified;
1111 // Process any backlog of GUIDs.
1112 // At this point we impose an upper limit on the number of items to fetch
1113 // in a single request, even for desktop, to avoid hitting URI limits.
1114 batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
1115 this.guidFetchBatchSize;
1117 while (fetchBatch.length && !aborting) {
1118 // Reuse the original query, but get rid of the restricting params
1119 // and batch remaining records.
1122 newitems.ids = fetchBatch.slice(0, batchSize);
1124 // Reuse the existing record handler set earlier
1125 let resp = newitems.get();
1126 if (!resp.success) {
1127 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1131 // This batch was successfully applied. Not using
1132 // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
1133 fetchBatch = fetchBatch.slice(batchSize);
1134 this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
1135 this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
1136 if (failed.length) {
1137 count.failed += failed.length;
1138 this._log.debug("Records that failed to apply: " + failed);
1146 if (this.lastSync < this.lastModified) {
1147 this.lastSync = this.lastModified;
1151 // Apply remaining items.
1152 doApplyBatchAndPersistFailed.call(this);
1154 count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
1155 count.succeeded = Math.max(0, count.applied - count.failed);
1156 this._log.info(["Records:",
1157 count.applied, "applied,",
1158 count.succeeded, "successfully,",
1159 count.failed, "failed to apply,",
1160 count.newFailed, "newly failed to apply,",
1161 count.reconciled, "reconciled."].join(" "));
1162 Observers.notify("weave:engine:sync:applied", count, this.name);
1166 * Find a GUID of an item that is a duplicate of the incoming item but happens
1167 * to have a different GUID
1169 * @return GUID of the similar item; falsy otherwise
1171 _findDupe: function (item) {
1172 // By default, assume there's no dupe items for the engine
1175 _deleteId: function (id) {
1176 this._tracker.removeChangedID(id);
1178 // Remember this id to delete at the end of sync
1179 if (this._delete.ids == null)
1180 this._delete.ids = [id];
1182 this._delete.ids.push(id);
1186 * Reconcile incoming record with local state.
1188 * This function essentially determines whether to apply an incoming record.
1191 * Record from server to be tested for application.
1193 * Truthy if incoming record should be applied. False if not.
1195 _reconcile: function (item) {
1196 if (this._log.level <= Log.Level.Trace) {
1197 this._log.trace("Incoming: " + item);
1200 // We start reconciling by collecting a bunch of state. We do this here
1201 // because some state may change during the course of this function and we
1202 // need to operate on the original values.
1203 let existsLocally = this._store.itemExists(item.id);
1204 let locallyModified = item.id in this._modified;
1206 // TODO Handle clock drift better. Tracked in bug 721181.
1207 let remoteAge = AsyncResource.serverTime - item.modified;
1208 let localAge = locallyModified ?
1209 (Date.now() / 1000 - this._modified[item.id]) : null;
1210 let remoteIsNewer = remoteAge < localAge;
1212 this._log.trace("Reconciling " + item.id + ". exists=" +
1213 existsLocally + "; modified=" + locallyModified +
1214 "; local age=" + localAge + "; incoming age=" +
1217 // We handle deletions first so subsequent logic doesn't have to check
1220 // If the item doesn't exist locally, there is nothing for us to do. We
1221 // can't check for duplicates because the incoming record has no data
1222 // which can be used for duplicate detection.
1223 if (!existsLocally) {
1224 this._log.trace("Ignoring incoming item because it was deleted and " +
1225 "the item does not exist locally.");
1229 // We decide whether to process the deletion by comparing the record
1230 // ages. If the item is not modified locally, the remote side wins and
1231 // the deletion is processed. If it is modified locally, we take the
1233 if (!locallyModified) {
1234 this._log.trace("Applying incoming delete because the local item " +
1235 "exists and isn't modified.");
1239 // TODO As part of bug 720592, determine whether we should do more here.
1240 // In the case where the local changes are newer, it is quite possible
1241 // that the local client will restore data a remote client had tried to
1242 // delete. There might be a good reason for that delete and it might be
1243 // enexpected for this client to restore that data.
1244 this._log.trace("Incoming record is deleted but we had local changes. " +
1245 "Applying the youngest record.");
1246 return remoteIsNewer;
1249 // At this point the incoming record is not for a deletion and must have
1250 // data. If the incoming record does not exist locally, we check for a local
1251 // duplicate existing under a different ID. The default implementation of
1252 // _findDupe() is empty, so engines have to opt in to this functionality.
1254 // If we find a duplicate, we change the local ID to the incoming ID and we
1255 // refresh the metadata collected above. See bug 710448 for the history
1257 if (!existsLocally) {
1258 let dupeID = this._findDupe(item);
1260 this._log.trace("Local item " + dupeID + " is a duplicate for " +
1261 "incoming item " + item.id);
1263 // The local, duplicate ID is always deleted on the server.
1264 this._deleteId(dupeID);
1266 // The current API contract does not mandate that the ID returned by
1267 // _findDupe() actually exists. Therefore, we have to perform this
1269 existsLocally = this._store.itemExists(dupeID);
1271 // We unconditionally change the item's ID in case the engine knows of
1272 // an item but doesn't expose it through itemExists. If the API
1273 // contract were stronger, this could be changed.
1274 this._log.debug("Switching local ID to incoming: " + dupeID + " -> " +
1276 this._store.changeItemID(dupeID, item.id);
1278 // If the local item was modified, we carry its metadata forward so
1279 // appropriate reconciling can be performed.
1280 if (dupeID in this._modified) {
1281 locallyModified = true;
1282 localAge = Date.now() / 1000 - this._modified[dupeID];
1283 remoteIsNewer = remoteAge < localAge;
1285 this._modified[item.id] = this._modified[dupeID];
1286 delete this._modified[dupeID];
1288 locallyModified = false;
1292 this._log.debug("Local item after duplication: age=" + localAge +
1293 "; modified=" + locallyModified + "; exists=" +
1296 this._log.trace("No duplicate found for incoming item: " + item.id);
1300 // At this point we've performed duplicate detection. But, nothing here
1301 // should depend on duplicate detection as the above should have updated
1302 // state seamlessly.
1304 if (!existsLocally) {
1305 // If the item doesn't exist locally and we have no local modifications
1306 // to the item (implying that it was not deleted), always apply the remote
1308 if (!locallyModified) {
1309 this._log.trace("Applying incoming because local item does not exist " +
1310 "and was not deleted.");
1314 // If the item was modified locally but isn't present, it must have
1315 // been deleted. If the incoming record is younger, we restore from
1317 if (remoteIsNewer) {
1318 this._log.trace("Applying incoming because local item was deleted " +
1319 "before the incoming item was changed.");
1320 delete this._modified[item.id];
1324 this._log.trace("Ignoring incoming item because the local item's " +
1325 "deletion is newer.");
1329 // If the remote and local records are the same, there is nothing to be
1330 // done, so we don't do anything. In the ideal world, this logic wouldn't
1331 // be here and the engine would take a record and apply it. The reason we
1332 // want to defer this logic is because it would avoid a redundant and
1333 // possibly expensive dip into the storage layer to query item state.
1334 // This should get addressed in the async rewrite, so we ignore it for now.
1335 let localRecord = this._createRecord(item.id);
1336 let recordsEqual = Utils.deepEquals(item.cleartext,
1337 localRecord.cleartext);
1339 // If the records are the same, we don't need to do anything. This does
1340 // potentially throw away a local modification time. But, if the records
1341 // are the same, does it matter?
1343 this._log.trace("Ignoring incoming item because the local item is " +
1346 delete this._modified[item.id];
1350 // At this point the records are different.
1352 // If we have no local modifications, always take the server record.
1353 if (!locallyModified) {
1354 this._log.trace("Applying incoming record because no local conflicts.");
1358 // At this point, records are different and the local record is modified.
1359 // We resolve conflicts by record age, where the newest one wins. This does
1360 // result in data loss and should be handled by giving the engine an
1361 // opportunity to merge the records. Bug 720592 tracks this feature.
1362 this._log.warn("DATA LOSS: Both local and remote changes to record: " +
1364 return remoteIsNewer;
1367 // Upload outgoing records.
1368 _uploadOutgoing: function () {
1369 this._log.trace("Uploading local changes to server.");
1371 let modifiedIDs = Object.keys(this._modified);
1372 if (modifiedIDs.length) {
1373 this._log.trace("Preparing " + modifiedIDs.length +
1374 " outgoing records");
1376 // collection we'll upload
1377 let up = new Collection(this.engineURL, null, this.service);
1380 // Upload what we've got so far in the collection
1381 let doUpload = Utils.bind2(this, function(desc) {
1382 this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
1384 let resp = up.post();
1385 if (!resp.success) {
1386 this._log.debug("Uploading records failed: " + resp);
1387 resp.failureCode = ENGINE_UPLOAD_FAIL;
1391 // Update server timestamp from the upload.
1392 let modified = resp.headers["x-weave-timestamp"];
1393 if (modified > this.lastSync)
1394 this.lastSync = modified;
1396 let failed_ids = Object.keys(resp.obj.failed);
1397 if (failed_ids.length)
1398 this._log.debug("Records that will be uploaded again because "
1399 + "the server couldn't store them: "
1400 + failed_ids.join(", "));
1402 // Clear successfully uploaded objects.
1403 for each (let id in resp.obj.success) {
1404 delete this._modified[id];
1410 for each (let id in modifiedIDs) {
1412 let out = this._createRecord(id);
1413 if (this._log.level <= Log.Level.Trace)
1414 this._log.trace("Outgoing: " + out);
1416 out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
1420 this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
1424 if ((++count % MAX_UPLOAD_RECORDS) == 0)
1425 doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
1427 this._store._sleep(0);
1431 if (count % MAX_UPLOAD_RECORDS > 0)
1432 doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
1436 // Any cleanup necessary.
1437 // Save the current snapshot so as to calculate changes at next sync
1438 _syncFinish: function () {
1439 this._log.trace("Finishing up sync");
1440 this._tracker.resetScore();
1442 let doDelete = Utils.bind2(this, function(key, val) {
1443 let coll = new Collection(this.engineURL, this._recordObj, this.service);
1448 for (let [key, val] in Iterator(this._delete)) {
1449 // Remove the key for future uses
1450 delete this._delete[key];
1452 // Send a simple delete for the property
1453 if (key != "ids" || val.length <= 100)
1456 // For many ids, split into chunks of at most 100
1457 while (val.length > 0) {
1458 doDelete(key, val.slice(0, 100));
1459 val = val.slice(100);
1465 _syncCleanup: function () {
1466 if (!this._modified) {
1470 // Mark failed WBOs as changed again so they are reuploaded next time.
1471 for (let [id, when] in Iterator(this._modified)) {
1472 this._tracker.addChangedID(id, when);
1474 this._modified = {};
1477 _sync: function () {
1479 this._syncStartup();
1480 Observers.notify("weave:engine:sync:status", "process-incoming");
1481 this._processIncoming();
1482 Observers.notify("weave:engine:sync:status", "upload-outgoing");
1483 this._uploadOutgoing();
1486 this._syncCleanup();
1490 canDecrypt: function () {
1491 // Report failure even if there's nothing to decrypt
1492 let canDecrypt = false;
1494 // Fetch the most recently uploaded record and try to decrypt it
1495 let test = new Collection(this.engineURL, this._recordObj, this.service);
1497 test.sort = "newest";
1500 let key = this.service.collectionKeys.keyForCollection(this.name);
1501 test.recordHandler = function recordHandler(record) {
1502 record.decrypt(key);
1506 // Any failure fetching/decrypting will just result in false
1508 this._log.trace("Trying to decrypt a record from the server..");
1512 this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
1518 _resetClient: function () {
1519 this.resetLastSync();
1520 this.previousFailed = [];
1524 wipeServer: function () {
1525 let response = this.service.resource(this.engineURL).delete();
1526 if (response.status != 200 && response.status != 404) {
1529 this._resetClient();
1532 removeClientData: function () {
1533 // Implement this method in engines that store client specific data
1538 * Decide on (and partially effect) an error-handling strategy.
1540 * Asks the Service to respond to an HMAC error, which might result in keys
1541 * being downloaded. That call returns true if an action which might allow a
1544 * If `mayRetry` is truthy, and the Service suggests a retry,
1545 * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
1546 * kRecoveryStrategy.error.
1548 * Subclasses of SyncEngine can override this method to allow for different
1549 * behavior -- e.g., to delete and ignore erroneous entries.
1551 * All return values will be part of the kRecoveryStrategy enumeration.
1553 handleHMACMismatch: function (item, mayRetry) {
1554 // By default we either try again, or bail out noisily.
1555 return (this.service.handleHMACEvent() && mayRetry) ?
1556 SyncEngine.kRecoveryStrategy.retry :
1557 SyncEngine.kRecoveryStrategy.error;