Bumping manifests a=b2g-bump
[gecko.git] / services / sync / modules / engines.js
blob73fc13d16fb4c994a956dc4b13b809c47b3aba5f
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 this.EXPORTED_SYMBOLS = [
6   "EngineManager",
7   "Engine",
8   "SyncEngine",
9   "Tracker",
10   "Store"
13 const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
15 Cu.import("resource://services-common/async.js");
16 Cu.import("resource://gre/modules/Log.jsm");
17 Cu.import("resource://services-common/observers.js");
18 Cu.import("resource://services-common/utils.js");
19 Cu.import("resource://services-sync/constants.js");
20 Cu.import("resource://services-sync/identity.js");
21 Cu.import("resource://services-sync/record.js");
22 Cu.import("resource://services-sync/resource.js");
23 Cu.import("resource://services-sync/util.js");
26  * Trackers are associated with a single engine and deal with
27  * listening for changes to their particular data type.
28  *
29  * There are two things they keep track of:
30  * 1) A score, indicating how urgently the engine wants to sync
31  * 2) A list of IDs for all the changed items that need to be synced
32  * and updating their 'score', indicating how urgently they
33  * want to sync.
34  *
35  */
36 this.Tracker = function Tracker(name, engine) {
37   if (!engine) {
38     throw new Error("Tracker must be associated with an Engine instance.");
39   }
41   name = name || "Unnamed";
42   this.name = this.file = name.toLowerCase();
43   this.engine = engine;
45   this._log = Log.repository.getLogger("Sync.Tracker." + name);
46   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
47   this._log.level = Log.Level[level];
49   this._score = 0;
50   this._ignored = [];
51   this.ignoreAll = false;
52   this.changedIDs = {};
53   this.loadChangedIDs();
55   Svc.Obs.add("weave:engine:start-tracking", this);
56   Svc.Obs.add("weave:engine:stop-tracking", this);
59 Tracker.prototype = {
60   /*
61    * Score can be called as often as desired to decide which engines to sync
62    *
63    * Valid values for score:
64    * -1: Do not sync unless the user specifically requests it (almost disabled)
65    * 0: Nothing has changed
66    * 100: Please sync me ASAP!
67    *
68    * Setting it to other values should (but doesn't currently) throw an exception
69    */
70   get score() {
71     return this._score;
72   },
74   set score(value) {
75     this._score = value;
76     Observers.notify("weave:engine:score:updated", this.name);
77   },
79   // Should be called by service everytime a sync has been done for an engine
80   resetScore: function () {
81     this._score = 0;
82   },
84   persistChangedIDs: true,
86   /**
87    * Persist changedIDs to disk at a later date.
88    * Optionally pass a callback to be invoked when the write has occurred.
89    */
90   saveChangedIDs: function (cb) {
91     if (!this.persistChangedIDs) {
92       this._log.debug("Not saving changedIDs.");
93       return;
94     }
95     Utils.namedTimer(function () {
96       this._log.debug("Saving changed IDs to " + this.file);
97       Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb);
98     }, 1000, this, "_lazySave");
99   },
101   loadChangedIDs: function (cb) {
102     Utils.jsonLoad("changes/" + this.file, this, function(json) {
103       if (json && (typeof(json) == "object")) {
104         this.changedIDs = json;
105       } else {
106         this._log.warn("Changed IDs file " + this.file + " contains non-object value.");
107         json = null;
108       }
109       if (cb) {
110         cb.call(this, json);
111       }
112     });
113   },
115   // ignore/unignore specific IDs.  Useful for ignoring items that are
116   // being processed, or that shouldn't be synced.
117   // But note: not persisted to disk
119   ignoreID: function (id) {
120     this.unignoreID(id);
121     this._ignored.push(id);
122   },
124   unignoreID: function (id) {
125     let index = this._ignored.indexOf(id);
126     if (index != -1)
127       this._ignored.splice(index, 1);
128   },
130   addChangedID: function (id, when) {
131     if (!id) {
132       this._log.warn("Attempted to add undefined ID to tracker");
133       return false;
134     }
136     if (this.ignoreAll || (id in this._ignored)) {
137       return false;
138     }
140     // Default to the current time in seconds if no time is provided.
141     if (when == null) {
142       when = Math.floor(Date.now() / 1000);
143     }
145     // Add/update the entry if we have a newer time.
146     if ((this.changedIDs[id] || -Infinity) < when) {
147       this._log.trace("Adding changed ID: " + id + ", " + when);
148       this.changedIDs[id] = when;
149       this.saveChangedIDs(this.onSavedChangedIDs);
150     }
152     return true;
153   },
155   removeChangedID: function (id) {
156     if (!id) {
157       this._log.warn("Attempted to remove undefined ID to tracker");
158       return false;
159     }
160     if (this.ignoreAll || (id in this._ignored))
161       return false;
162     if (this.changedIDs[id] != null) {
163       this._log.trace("Removing changed ID " + id);
164       delete this.changedIDs[id];
165       this.saveChangedIDs();
166     }
167     return true;
168   },
170   clearChangedIDs: function () {
171     this._log.trace("Clearing changed ID list");
172     this.changedIDs = {};
173     this.saveChangedIDs();
174   },
176   _isTracking: false,
178   // Override these in your subclasses.
179   startTracking: function () {
180   },
182   stopTracking: function () {
183   },
185   engineIsEnabled: function () {
186     if (!this.engine) {
187       // Can't tell -- we must be running in a test!
188       return true;
189     }
190     return this.engine.enabled;
191   },
193   onEngineEnabledChanged: function (engineEnabled) {
194     if (engineEnabled == this._isTracking) {
195       return;
196     }
198     if (engineEnabled) {
199       this.startTracking();
200       this._isTracking = true;
201     } else {
202       this.stopTracking();
203       this._isTracking = false;
204       this.clearChangedIDs();
205     }
206   },
208   observe: function (subject, topic, data) {
209     switch (topic) {
210       case "weave:engine:start-tracking":
211         if (!this.engineIsEnabled()) {
212           return;
213         }
214         this._log.trace("Got start-tracking.");
215         if (!this._isTracking) {
216           this.startTracking();
217           this._isTracking = true;
218         }
219         return;
220       case "weave:engine:stop-tracking":
221         this._log.trace("Got stop-tracking.");
222         if (this._isTracking) {
223           this.stopTracking();
224           this._isTracking = false;
225         }
226         return;
227     }
228   }
234  * The Store serves as the interface between Sync and stored data.
236  * The name "store" is slightly a misnomer because it doesn't actually "store"
237  * anything. Instead, it serves as a gateway to something that actually does
238  * the "storing."
240  * The store is responsible for record management inside an engine. It tells
241  * Sync what items are available for Sync, converts items to and from Sync's
242  * record format, and applies records from Sync into changes on the underlying
243  * store.
245  * Store implementations require a number of functions to be implemented. These
246  * are all documented below.
248  * For stores that deal with many records or which have expensive store access
249  * routines, it is highly recommended to implement a custom applyIncomingBatch
250  * and/or applyIncoming function on top of the basic APIs.
251  */
253 this.Store = function Store(name, engine) {
254   if (!engine) {
255     throw new Error("Store must be associated with an Engine instance.");
256   }
258   name = name || "Unnamed";
259   this.name = name.toLowerCase();
260   this.engine = engine;
262   this._log = Log.repository.getLogger("Sync.Store." + name);
263   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
264   this._log.level = Log.Level[level];
266   XPCOMUtils.defineLazyGetter(this, "_timer", function() {
267     return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
268   });
270 Store.prototype = {
272   _sleep: function _sleep(delay) {
273     let cb = Async.makeSyncCallback();
274     this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
275     Async.waitForSyncCallback(cb);
276   },
278   /**
279    * Apply multiple incoming records against the store.
280    *
281    * This is called with a set of incoming records to process. The function
282    * should look at each record, reconcile with the current local state, and
283    * make the local changes required to bring its state in alignment with the
284    * record.
285    *
286    * The default implementation simply iterates over all records and calls
287    * applyIncoming(). Store implementations may overwrite this function
288    * if desired.
289    *
290    * @param  records Array of records to apply
291    * @return Array of record IDs which did not apply cleanly
292    */
293   applyIncomingBatch: function (records) {
294     let failed = [];
295     for each (let record in records) {
296       try {
297         this.applyIncoming(record);
298       } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
299         // This kind of exception should have a 'cause' attribute, which is an
300         // originating exception.
301         // ex.cause will carry its stack with it when rethrown.
302         throw ex.cause;
303       } catch (ex) {
304         this._log.warn("Failed to apply incoming record " + record.id);
305         this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
306         failed.push(record.id);
307       }
308     };
309     return failed;
310   },
312   /**
313    * Apply a single record against the store.
314    *
315    * This takes a single record and makes the local changes required so the
316    * local state matches what's in the record.
317    *
318    * The default implementation calls one of remove(), create(), or update()
319    * depending on the state obtained from the store itself. Store
320    * implementations may overwrite this function if desired.
321    *
322    * @param record
323    *        Record to apply
324    */
325   applyIncoming: function (record) {
326     if (record.deleted)
327       this.remove(record);
328     else if (!this.itemExists(record.id))
329       this.create(record);
330     else
331       this.update(record);
332   },
334   // override these in derived objects
336   /**
337    * Create an item in the store from a record.
338    *
339    * This is called by the default implementation of applyIncoming(). If using
340    * applyIncomingBatch(), this won't be called unless your store calls it.
341    *
342    * @param record
343    *        The store record to create an item from
344    */
345   create: function (record) {
346     throw "override create in a subclass";
347   },
349   /**
350    * Remove an item in the store from a record.
351    *
352    * This is called by the default implementation of applyIncoming(). If using
353    * applyIncomingBatch(), this won't be called unless your store calls it.
354    *
355    * @param record
356    *        The store record to delete an item from
357    */
358   remove: function (record) {
359     throw "override remove in a subclass";
360   },
362   /**
363    * Update an item from a record.
364    *
365    * This is called by the default implementation of applyIncoming(). If using
366    * applyIncomingBatch(), this won't be called unless your store calls it.
367    *
368    * @param record
369    *        The record to use to update an item from
370    */
371   update: function (record) {
372     throw "override update in a subclass";
373   },
375   /**
376    * Determine whether a record with the specified ID exists.
377    *
378    * Takes a string record ID and returns a booleans saying whether the record
379    * exists.
380    *
381    * @param  id
382    *         string record ID
383    * @return boolean indicating whether record exists locally
384    */
385   itemExists: function (id) {
386     throw "override itemExists in a subclass";
387   },
389   /**
390    * Create a record from the specified ID.
391    *
392    * If the ID is known, the record should be populated with metadata from
393    * the store. If the ID is not known, the record should be created with the
394    * delete field set to true.
395    *
396    * @param  id
397    *         string record ID
398    * @param  collection
399    *         Collection to add record to. This is typically passed into the
400    *         constructor for the newly-created record.
401    * @return record type for this engine
402    */
403   createRecord: function (id, collection) {
404     throw "override createRecord in a subclass";
405   },
407   /**
408    * Change the ID of a record.
409    *
410    * @param  oldID
411    *         string old/current record ID
412    * @param  newID
413    *         string new record ID
414    */
415   changeItemID: function (oldID, newID) {
416     throw "override changeItemID in a subclass";
417   },
419   /**
420    * Obtain the set of all known record IDs.
421    *
422    * @return Object with ID strings as keys and values of true. The values
423    *         are ignored.
424    */
425   getAllIDs: function () {
426     throw "override getAllIDs in a subclass";
427   },
429   /**
430    * Wipe all data in the store.
431    *
432    * This function is called during remote wipes or when replacing local data
433    * with remote data.
434    *
435    * This function should delete all local data that the store is managing. It
436    * can be thought of as clearing out all state and restoring the "new
437    * browser" state.
438    */
439   wipe: function () {
440     throw "override wipe in a subclass";
441   }
444 this.EngineManager = function EngineManager(service) {
445   this.service = service;
447   this._engines = {};
449   // This will be populated by Service on startup.
450   this._declined = new Set();
451   this._log = Log.repository.getLogger("Sync.EngineManager");
452   this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")];
454 EngineManager.prototype = {
455   get: function (name) {
456     // Return an array of engines if we have an array of names
457     if (Array.isArray(name)) {
458       let engines = [];
459       name.forEach(function(name) {
460         let engine = this.get(name);
461         if (engine) {
462           engines.push(engine);
463         }
464       }, this);
465       return engines;
466     }
468     let engine = this._engines[name];
469     if (!engine) {
470       this._log.debug("Could not get engine: " + name);
471       if (Object.keys) {
472         this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines)));
473       }
474     }
475     return engine;
476   },
478   getAll: function () {
479     return [engine for ([name, engine] in Iterator(this._engines))];
480   },
482   /**
483    * N.B., does not pay attention to the declined list.
484    */
485   getEnabled: function () {
486     return this.getAll().filter(function(engine) engine.enabled);
487   },
489   get enabledEngineNames() {
490     return [e.name for each (e in this.getEnabled())];
491   },
493   persistDeclined: function () {
494     Svc.Prefs.set("declinedEngines", [...this._declined].join(","));
495   },
497   /**
498    * Returns an array.
499    */
500   getDeclined: function () {
501     return [...this._declined];
502   },
504   setDeclined: function (engines) {
505     this._declined = new Set(engines);
506     this.persistDeclined();
507   },
509   isDeclined: function (engineName) {
510     return this._declined.has(engineName);
511   },
513   /**
514    * Accepts a Set or an array.
515    */
516   decline: function (engines) {
517     for (let e of engines) {
518       this._declined.add(e);
519     }
520     this.persistDeclined();
521   },
523   undecline: function (engines) {
524     for (let e of engines) {
525       this._declined.delete(e);
526     }
527     this.persistDeclined();
528   },
530   /**
531    * Mark any non-enabled engines as declined.
532    *
533    * This is useful after initial customization during setup.
534    */
535   declineDisabled: function () {
536     for (let e of this.getAll()) {
537       if (!e.enabled) {
538         this._log.debug("Declining disabled engine " + e.name);
539         this._declined.add(e.name);
540       }
541     }
542     this.persistDeclined();
543   },
545   /**
546    * Register an Engine to the service. Alternatively, give an array of engine
547    * objects to register.
548    *
549    * @param engineObject
550    *        Engine object used to get an instance of the engine
551    * @return The engine object if anything failed
552    */
553   register: function (engineObject) {
554     if (Array.isArray(engineObject)) {
555       return engineObject.map(this.register, this);
556     }
558     try {
559       let engine = new engineObject(this.service);
560       let name = engine.name;
561       if (name in this._engines) {
562         this._log.error("Engine '" + name + "' is already registered!");
563       } else {
564         this._engines[name] = engine;
565       }
566     } catch (ex) {
567       this._log.error(CommonUtils.exceptionStr(ex));
569       let mesg = ex.message ? ex.message : ex;
570       let name = engineObject || "";
571       name = name.prototype || "";
572       name = name.name || "";
574       let out = "Could not initialize engine '" + name + "': " + mesg;
575       this._log.error(out);
577       return engineObject;
578     }
579   },
581   unregister: function (val) {
582     let name = val;
583     if (val instanceof Engine) {
584       name = val.name;
585     }
586     delete this._engines[name];
587   },
589   clear: function () {
590     for (let name in this._engines) {
591       delete this._engines[name];
592     }
593   },
596 this.Engine = function Engine(name, service) {
597   if (!service) {
598     throw new Error("Engine must be associated with a Service instance.");
599   }
601   this.Name = name || "Unnamed";
602   this.name = name.toLowerCase();
603   this.service = service;
605   this._notify = Utils.notify("weave:engine:");
606   this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
607   let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
608   this._log.level = Log.Level[level];
610   this._tracker; // initialize tracker to load previously changed IDs
611   this._log.debug("Engine initialized");
613 Engine.prototype = {
614   // _storeObj, and _trackerObj should to be overridden in subclasses
615   _storeObj: Store,
616   _trackerObj: Tracker,
618   // Local 'constant'.
619   // Signal to the engine that processing further records is pointless.
620   eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
622   get prefName() this.name,
623   get enabled() {
624     return Svc.Prefs.get("engine." + this.prefName, false);
625   },
627   set enabled(val) {
628     Svc.Prefs.set("engine." + this.prefName, !!val);
629     this._tracker.onEngineEnabledChanged(val);
630   },
632   get score() this._tracker.score,
634   get _store() {
635     let store = new this._storeObj(this.Name, this);
636     this.__defineGetter__("_store", function() store);
637     return store;
638   },
640   get _tracker() {
641     let tracker = new this._trackerObj(this.Name, this);
642     this.__defineGetter__("_tracker", function() tracker);
643     return tracker;
644   },
646   sync: function () {
647     if (!this.enabled) {
648       return;
649     }
651     if (!this._sync) {
652       throw "engine does not implement _sync method";
653     }
655     this._notify("sync", this.name, this._sync)();
656   },
658   /**
659    * Get rid of any local meta-data.
660    */
661   resetClient: function () {
662     if (!this._resetClient) {
663       throw "engine does not implement _resetClient method";
664     }
666     this._notify("reset-client", this.name, this._resetClient)();
667   },
669   _wipeClient: function () {
670     this.resetClient();
671     this._log.debug("Deleting all local data");
672     this._tracker.ignoreAll = true;
673     this._store.wipe();
674     this._tracker.ignoreAll = false;
675     this._tracker.clearChangedIDs();
676   },
678   wipeClient: function () {
679     this._notify("wipe-client", this.name, this._wipeClient)();
680   }
683 this.SyncEngine = function SyncEngine(name, service) {
684   Engine.call(this, name || "SyncEngine", service);
686   this.loadToFetch();
687   this.loadPreviousFailed();
690 // Enumeration to define approaches to handling bad records.
691 // Attached to the constructor to allow use as a kind of static enumeration.
692 SyncEngine.kRecoveryStrategy = {
693   ignore: "ignore",
694   retry:  "retry",
695   error:  "error"
698 SyncEngine.prototype = {
699   __proto__: Engine.prototype,
700   _recordObj: CryptoWrapper,
701   version: 1,
703   // How many records to pull in a single sync. This is primarily to avoid very
704   // long first syncs against profiles with many history records.
705   downloadLimit: null,
707   // How many records to pull at one time when specifying IDs. This is to avoid
708   // URI length limitations.
709   guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
710   mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE,
712   // How many records to process in a single batch.
713   applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
715   get storageURL() this.service.storageURL,
717   get engineURL() this.storageURL + this.name,
719   get cryptoKeysURL() this.storageURL + "crypto/keys",
721   get metaURL() this.storageURL + "meta/global",
723   get syncID() {
724     // Generate a random syncID if we don't have one
725     let syncID = Svc.Prefs.get(this.name + ".syncID", "");
726     return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
727   },
728   set syncID(value) {
729     Svc.Prefs.set(this.name + ".syncID", value);
730   },
732   /*
733    * lastSync is a timestamp in server time.
734    */
735   get lastSync() {
736     return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
737   },
738   set lastSync(value) {
739     // Reset the pref in-case it's a number instead of a string
740     Svc.Prefs.reset(this.name + ".lastSync");
741     // Store the value as a string to keep floating point precision
742     Svc.Prefs.set(this.name + ".lastSync", value.toString());
743   },
744   resetLastSync: function () {
745     this._log.debug("Resetting " + this.name + " last sync time");
746     Svc.Prefs.reset(this.name + ".lastSync");
747     Svc.Prefs.set(this.name + ".lastSync", "0");
748     this.lastSyncLocal = 0;
749   },
751   get toFetch() this._toFetch,
752   set toFetch(val) {
753     let cb = (error) => this._log.error(Utils.exceptionStr(error));
754     // Coerce the array to a string for more efficient comparison.
755     if (val + "" == this._toFetch) {
756       return;
757     }
758     this._toFetch = val;
759     Utils.namedTimer(function () {
760       Utils.jsonSave("toFetch/" + this.name, this, val, cb);
761     }, 0, this, "_toFetchDelay");
762   },
764   loadToFetch: function () {
765     // Initialize to empty if there's no file.
766     this._toFetch = [];
767     Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
768       if (toFetch) {
769         this._toFetch = toFetch;
770       }
771     });
772   },
774   get previousFailed() this._previousFailed,
775   set previousFailed(val) {
776     let cb = (error) => this._log.error(Utils.exceptionStr(error));
777     // Coerce the array to a string for more efficient comparison.
778     if (val + "" == this._previousFailed) {
779       return;
780     }
781     this._previousFailed = val;
782     Utils.namedTimer(function () {
783       Utils.jsonSave("failed/" + this.name, this, val, cb);
784     }, 0, this, "_previousFailedDelay");
785   },
787   loadPreviousFailed: function () {
788     // Initialize to empty if there's no file
789     this._previousFailed = [];
790     Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
791       if (previousFailed) {
792         this._previousFailed = previousFailed;
793       }
794     });
795   },
797   /*
798    * lastSyncLocal is a timestamp in local time.
799    */
800   get lastSyncLocal() {
801     return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
802   },
803   set lastSyncLocal(value) {
804     // Store as a string because pref can only store C longs as numbers.
805     Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString());
806   },
808   /*
809    * Returns a mapping of IDs -> changed timestamp. Engine implementations
810    * can override this method to bypass the tracker for certain or all
811    * changed items.
812    */
813   getChangedIDs: function () {
814     return this._tracker.changedIDs;
815   },
817   // Create a new record using the store and add in crypto fields.
818   _createRecord: function (id) {
819     let record = this._store.createRecord(id, this.name);
820     record.id = id;
821     record.collection = this.name;
822     return record;
823   },
825   // Any setup that needs to happen at the beginning of each sync.
826   _syncStartup: function () {
828     // Determine if we need to wipe on outdated versions
829     let metaGlobal = this.service.recordManager.get(this.metaURL);
830     let engines = metaGlobal.payload.engines || {};
831     let engineData = engines[this.name] || {};
833     let needsWipe = false;
835     // Assume missing versions are 0 and wipe the server
836     if ((engineData.version || 0) < this.version) {
837       this._log.debug("Old engine data: " + [engineData.version, this.version]);
839       // Prepare to clear the server and upload everything
840       needsWipe = true;
841       this.syncID = "";
843       // Set the newer version and newly generated syncID
844       engineData.version = this.version;
845       engineData.syncID = this.syncID;
847       // Put the new data back into meta/global and mark for upload
848       engines[this.name] = engineData;
849       metaGlobal.payload.engines = engines;
850       metaGlobal.changed = true;
851     }
852     // Don't sync this engine if the server has newer data
853     else if (engineData.version > this.version) {
854       let error = new String("New data: " + [engineData.version, this.version]);
855       error.failureCode = VERSION_OUT_OF_DATE;
856       throw error;
857     }
858     // Changes to syncID mean we'll need to upload everything
859     else if (engineData.syncID != this.syncID) {
860       this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
861       this.syncID = engineData.syncID;
862       this._resetClient();
863     };
865     // Delete any existing data and reupload on bad version or missing meta.
866     // No crypto component here...? We could regenerate per-collection keys...
867     if (needsWipe) {
868       this.wipeServer();
869     }
871     // Save objects that need to be uploaded in this._modified. We also save
872     // the timestamp of this fetch in this.lastSyncLocal. As we successfully
873     // upload objects we remove them from this._modified. If an error occurs
874     // or any objects fail to upload, they will remain in this._modified. At
875     // the end of a sync, or after an error, we add all objects remaining in
876     // this._modified to the tracker.
877     this.lastSyncLocal = Date.now();
878     if (this.lastSync) {
879       this._modified = this.getChangedIDs();
880     } else {
881       // Mark all items to be uploaded, but treat them as changed from long ago
882       this._log.debug("First sync, uploading all items");
883       this._modified = {};
884       for (let id in this._store.getAllIDs()) {
885         this._modified[id] = 0;
886       }
887     }
888     // Clear the tracker now. If the sync fails we'll add the ones we failed
889     // to upload back.
890     this._tracker.clearChangedIDs();
892     this._log.info(Object.keys(this._modified).length +
893                    " outgoing items pre-reconciliation");
895     // Keep track of what to delete at the end of sync
896     this._delete = {};
897   },
899   /**
900    * A tiny abstraction to make it easier to test incoming record
901    * application.
902    */
903   _itemSource: function () {
904     return new Collection(this.engineURL, this._recordObj, this.service);
905   },
907   /**
908    * Process incoming records.
909    * In the most awful and untestable way possible.
910    * This now accepts something that makes testing vaguely less impossible.
911    */
912   _processIncoming: function (newitems) {
913     this._log.trace("Downloading & applying server changes");
915     // Figure out how many total items to fetch this sync; do less on mobile.
916     let batchSize = this.downloadLimit || Infinity;
917     let isMobile = (Svc.Prefs.get("client.type") == "mobile");
919     if (!newitems) {
920       newitems = this._itemSource();
921     }
923     if (isMobile) {
924       batchSize = MOBILE_BATCH_SIZE;
925     }
926     newitems.newer = this.lastSync;
927     newitems.full  = true;
928     newitems.limit = batchSize;
930     // applied    => number of items that should be applied.
931     // failed     => number of items that failed in this sync.
932     // newFailed  => number of items that failed for the first time in this sync.
933     // reconciled => number of items that were reconciled.
934     let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
935     let handled = [];
936     let applyBatch = [];
937     let failed = [];
938     let failedInPreviousSync = this.previousFailed;
939     let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
940     // Reset previousFailed for each sync since previously failed items may not fail again.
941     this.previousFailed = [];
943     // Used (via exceptions) to allow the record handler/reconciliation/etc.
944     // methods to signal that they would like processing of incoming records to
945     // cease.
946     let aborting = undefined;
948     function doApplyBatch() {
949       this._tracker.ignoreAll = true;
950       try {
951         failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
952       } catch (ex) {
953         // Catch any error that escapes from applyIncomingBatch. At present
954         // those will all be abort events.
955         this._log.warn("Got exception " + Utils.exceptionStr(ex) +
956                        ", aborting processIncoming.");
957         aborting = ex;
958       }
959       this._tracker.ignoreAll = false;
960       applyBatch = [];
961     }
963     function doApplyBatchAndPersistFailed() {
964       // Apply remaining batch.
965       if (applyBatch.length) {
966         doApplyBatch.call(this);
967       }
968       // Persist failed items so we refetch them.
969       if (failed.length) {
970         this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
971         count.failed += failed.length;
972         this._log.debug("Records that failed to apply: " + failed);
973         failed = [];
974       }
975     }
977     let key = this.service.collectionKeys.keyForCollection(this.name);
979     // Not binding this method to 'this' for performance reasons. It gets
980     // called for every incoming record.
981     let self = this;
983     newitems.recordHandler = function(item) {
984       if (aborting) {
985         return;
986       }
988       // Grab a later last modified if possible
989       if (self.lastModified == null || item.modified > self.lastModified)
990         self.lastModified = item.modified;
992       // Track the collection for the WBO.
993       item.collection = self.name;
995       // Remember which records were processed
996       handled.push(item.id);
998       try {
999         try {
1000           item.decrypt(key);
1001         } catch (ex if Utils.isHMACMismatch(ex)) {
1002           let strategy = self.handleHMACMismatch(item, true);
1003           if (strategy == SyncEngine.kRecoveryStrategy.retry) {
1004             // You only get one retry.
1005             try {
1006               // Try decrypting again, typically because we've got new keys.
1007               self._log.info("Trying decrypt again...");
1008               key = self.service.collectionKeys.keyForCollection(self.name);
1009               item.decrypt(key);
1010               strategy = null;
1011             } catch (ex if Utils.isHMACMismatch(ex)) {
1012               strategy = self.handleHMACMismatch(item, false);
1013             }
1014           }
1016           switch (strategy) {
1017             case null:
1018               // Retry succeeded! No further handling.
1019               break;
1020             case SyncEngine.kRecoveryStrategy.retry:
1021               self._log.debug("Ignoring second retry suggestion.");
1022               // Fall through to error case.
1023             case SyncEngine.kRecoveryStrategy.error:
1024               self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1025               failed.push(item.id);
1026               return;
1027             case SyncEngine.kRecoveryStrategy.ignore:
1028               self._log.debug("Ignoring record " + item.id +
1029                               " with bad HMAC: already handled.");
1030               return;
1031           }
1032         }
1033       } catch (ex) {
1034         self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1035         failed.push(item.id);
1036         return;
1037       }
1039       let shouldApply;
1040       try {
1041         shouldApply = self._reconcile(item);
1042       } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
1043         self._log.warn("Reconciliation failed: aborting incoming processing.");
1044         failed.push(item.id);
1045         aborting = ex.cause;
1046       } catch (ex) {
1047         self._log.warn("Failed to reconcile incoming record " + item.id);
1048         self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
1049         failed.push(item.id);
1050         return;
1051       }
1053       if (shouldApply) {
1054         count.applied++;
1055         applyBatch.push(item);
1056       } else {
1057         count.reconciled++;
1058         self._log.trace("Skipping reconciled incoming item " + item.id);
1059       }
1061       if (applyBatch.length == self.applyIncomingBatchSize) {
1062         doApplyBatch.call(self);
1063       }
1064       self._store._sleep(0);
1065     };
1067     // Only bother getting data from the server if there's new things
1068     if (this.lastModified == null || this.lastModified > this.lastSync) {
1069       let resp = newitems.get();
1070       doApplyBatchAndPersistFailed.call(this);
1071       if (!resp.success) {
1072         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1073         throw resp;
1074       }
1076       if (aborting) {
1077         throw aborting;
1078       }
1079     }
1081     // Mobile: check if we got the maximum that we requested; get the rest if so.
1082     if (handled.length == newitems.limit) {
1083       let guidColl = new Collection(this.engineURL, null, this.service);
1085       // Sort and limit so that on mobile we only get the last X records.
1086       guidColl.limit = this.downloadLimit;
1087       guidColl.newer = this.lastSync;
1089       // index: Orders by the sortindex descending (highest weight first).
1090       guidColl.sort  = "index";
1092       let guids = guidColl.get();
1093       if (!guids.success)
1094         throw guids;
1096       // Figure out which guids weren't just fetched then remove any guids that
1097       // were already waiting and prepend the new ones
1098       let extra = Utils.arraySub(guids.obj, handled);
1099       if (extra.length > 0) {
1100         fetchBatch = Utils.arrayUnion(extra, fetchBatch);
1101         this.toFetch = Utils.arrayUnion(extra, this.toFetch);
1102       }
1103     }
1105     // Fast-foward the lastSync timestamp since we have stored the
1106     // remaining items in toFetch.
1107     if (this.lastSync < this.lastModified) {
1108       this.lastSync = this.lastModified;
1109     }
1111     // Process any backlog of GUIDs.
1112     // At this point we impose an upper limit on the number of items to fetch
1113     // in a single request, even for desktop, to avoid hitting URI limits.
1114     batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
1115                            this.guidFetchBatchSize;
1117     while (fetchBatch.length && !aborting) {
1118       // Reuse the original query, but get rid of the restricting params
1119       // and batch remaining records.
1120       newitems.limit = 0;
1121       newitems.newer = 0;
1122       newitems.ids = fetchBatch.slice(0, batchSize);
1124       // Reuse the existing record handler set earlier
1125       let resp = newitems.get();
1126       if (!resp.success) {
1127         resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1128         throw resp;
1129       }
1131       // This batch was successfully applied. Not using
1132       // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
1133       fetchBatch = fetchBatch.slice(batchSize);
1134       this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
1135       this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
1136       if (failed.length) {
1137         count.failed += failed.length;
1138         this._log.debug("Records that failed to apply: " + failed);
1139       }
1140       failed = [];
1142       if (aborting) {
1143         throw aborting;
1144       }
1146       if (this.lastSync < this.lastModified) {
1147         this.lastSync = this.lastModified;
1148       }
1149     }
1151     // Apply remaining items.
1152     doApplyBatchAndPersistFailed.call(this);
1154     count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
1155     count.succeeded = Math.max(0, count.applied - count.failed);
1156     this._log.info(["Records:",
1157                     count.applied, "applied,",
1158                     count.succeeded, "successfully,",
1159                     count.failed, "failed to apply,",
1160                     count.newFailed, "newly failed to apply,",
1161                     count.reconciled, "reconciled."].join(" "));
1162     Observers.notify("weave:engine:sync:applied", count, this.name);
1163   },
1165   /**
1166    * Find a GUID of an item that is a duplicate of the incoming item but happens
1167    * to have a different GUID
1168    *
1169    * @return GUID of the similar item; falsy otherwise
1170    */
1171   _findDupe: function (item) {
1172     // By default, assume there's no dupe items for the engine
1173   },
1175   _deleteId: function (id) {
1176     this._tracker.removeChangedID(id);
1178     // Remember this id to delete at the end of sync
1179     if (this._delete.ids == null)
1180       this._delete.ids = [id];
1181     else
1182       this._delete.ids.push(id);
1183   },
1185   /**
1186    * Reconcile incoming record with local state.
1187    *
1188    * This function essentially determines whether to apply an incoming record.
1189    *
1190    * @param  item
1191    *         Record from server to be tested for application.
1192    * @return boolean
1193    *         Truthy if incoming record should be applied. False if not.
1194    */
1195   _reconcile: function (item) {
1196     if (this._log.level <= Log.Level.Trace) {
1197       this._log.trace("Incoming: " + item);
1198     }
1200     // We start reconciling by collecting a bunch of state. We do this here
1201     // because some state may change during the course of this function and we
1202     // need to operate on the original values.
1203     let existsLocally   = this._store.itemExists(item.id);
1204     let locallyModified = item.id in this._modified;
1206     // TODO Handle clock drift better. Tracked in bug 721181.
1207     let remoteAge = AsyncResource.serverTime - item.modified;
1208     let localAge  = locallyModified ?
1209       (Date.now() / 1000 - this._modified[item.id]) : null;
1210     let remoteIsNewer = remoteAge < localAge;
1212     this._log.trace("Reconciling " + item.id + ". exists=" +
1213                     existsLocally + "; modified=" + locallyModified +
1214                     "; local age=" + localAge + "; incoming age=" +
1215                     remoteAge);
1217     // We handle deletions first so subsequent logic doesn't have to check
1218     // deleted flags.
1219     if (item.deleted) {
1220       // If the item doesn't exist locally, there is nothing for us to do. We
1221       // can't check for duplicates because the incoming record has no data
1222       // which can be used for duplicate detection.
1223       if (!existsLocally) {
1224         this._log.trace("Ignoring incoming item because it was deleted and " +
1225                         "the item does not exist locally.");
1226         return false;
1227       }
1229       // We decide whether to process the deletion by comparing the record
1230       // ages. If the item is not modified locally, the remote side wins and
1231       // the deletion is processed. If it is modified locally, we take the
1232       // newer record.
1233       if (!locallyModified) {
1234         this._log.trace("Applying incoming delete because the local item " +
1235                         "exists and isn't modified.");
1236         return true;
1237       }
1239       // TODO As part of bug 720592, determine whether we should do more here.
1240       // In the case where the local changes are newer, it is quite possible
1241       // that the local client will restore data a remote client had tried to
1242       // delete. There might be a good reason for that delete and it might be
1243       // enexpected for this client to restore that data.
1244       this._log.trace("Incoming record is deleted but we had local changes. " +
1245                       "Applying the youngest record.");
1246       return remoteIsNewer;
1247     }
1249     // At this point the incoming record is not for a deletion and must have
1250     // data. If the incoming record does not exist locally, we check for a local
1251     // duplicate existing under a different ID. The default implementation of
1252     // _findDupe() is empty, so engines have to opt in to this functionality.
1253     //
1254     // If we find a duplicate, we change the local ID to the incoming ID and we
1255     // refresh the metadata collected above. See bug 710448 for the history
1256     // of this logic.
1257     if (!existsLocally) {
1258       let dupeID = this._findDupe(item);
1259       if (dupeID) {
1260         this._log.trace("Local item " + dupeID + " is a duplicate for " +
1261                         "incoming item " + item.id);
1263         // The local, duplicate ID is always deleted on the server.
1264         this._deleteId(dupeID);
1266         // The current API contract does not mandate that the ID returned by
1267         // _findDupe() actually exists. Therefore, we have to perform this
1268         // check.
1269         existsLocally = this._store.itemExists(dupeID);
1271         // We unconditionally change the item's ID in case the engine knows of
1272         // an item but doesn't expose it through itemExists. If the API
1273         // contract were stronger, this could be changed.
1274         this._log.debug("Switching local ID to incoming: " + dupeID + " -> " +
1275                         item.id);
1276         this._store.changeItemID(dupeID, item.id);
1278         // If the local item was modified, we carry its metadata forward so
1279         // appropriate reconciling can be performed.
1280         if (dupeID in this._modified) {
1281           locallyModified = true;
1282           localAge = Date.now() / 1000 - this._modified[dupeID];
1283           remoteIsNewer = remoteAge < localAge;
1285           this._modified[item.id] = this._modified[dupeID];
1286           delete this._modified[dupeID];
1287         } else {
1288           locallyModified = false;
1289           localAge = null;
1290         }
1292         this._log.debug("Local item after duplication: age=" + localAge +
1293                         "; modified=" + locallyModified + "; exists=" +
1294                         existsLocally);
1295       } else {
1296         this._log.trace("No duplicate found for incoming item: " + item.id);
1297       }
1298     }
1300     // At this point we've performed duplicate detection. But, nothing here
1301     // should depend on duplicate detection as the above should have updated
1302     // state seamlessly.
1304     if (!existsLocally) {
1305       // If the item doesn't exist locally and we have no local modifications
1306       // to the item (implying that it was not deleted), always apply the remote
1307       // item.
1308       if (!locallyModified) {
1309         this._log.trace("Applying incoming because local item does not exist " +
1310                         "and was not deleted.");
1311         return true;
1312       }
1314       // If the item was modified locally but isn't present, it must have
1315       // been deleted. If the incoming record is younger, we restore from
1316       // that record.
1317       if (remoteIsNewer) {
1318         this._log.trace("Applying incoming because local item was deleted " +
1319                         "before the incoming item was changed.");
1320         delete this._modified[item.id];
1321         return true;
1322       }
1324       this._log.trace("Ignoring incoming item because the local item's " +
1325                       "deletion is newer.");
1326       return false;
1327     }
1329     // If the remote and local records are the same, there is nothing to be
1330     // done, so we don't do anything. In the ideal world, this logic wouldn't
1331     // be here and the engine would take a record and apply it. The reason we
1332     // want to defer this logic is because it would avoid a redundant and
1333     // possibly expensive dip into the storage layer to query item state.
1334     // This should get addressed in the async rewrite, so we ignore it for now.
1335     let localRecord = this._createRecord(item.id);
1336     let recordsEqual = Utils.deepEquals(item.cleartext,
1337                                         localRecord.cleartext);
1339     // If the records are the same, we don't need to do anything. This does
1340     // potentially throw away a local modification time. But, if the records
1341     // are the same, does it matter?
1342     if (recordsEqual) {
1343       this._log.trace("Ignoring incoming item because the local item is " +
1344                       "identical.");
1346       delete this._modified[item.id];
1347       return false;
1348     }
1350     // At this point the records are different.
1352     // If we have no local modifications, always take the server record.
1353     if (!locallyModified) {
1354       this._log.trace("Applying incoming record because no local conflicts.");
1355       return true;
1356     }
1358     // At this point, records are different and the local record is modified.
1359     // We resolve conflicts by record age, where the newest one wins. This does
1360     // result in data loss and should be handled by giving the engine an
1361     // opportunity to merge the records. Bug 720592 tracks this feature.
1362     this._log.warn("DATA LOSS: Both local and remote changes to record: " +
1363                    item.id);
1364     return remoteIsNewer;
1365   },
1367   // Upload outgoing records.
1368   _uploadOutgoing: function () {
1369     this._log.trace("Uploading local changes to server.");
1371     let modifiedIDs = Object.keys(this._modified);
1372     if (modifiedIDs.length) {
1373       this._log.trace("Preparing " + modifiedIDs.length +
1374                       " outgoing records");
1376       // collection we'll upload
1377       let up = new Collection(this.engineURL, null, this.service);
1378       let count = 0;
1380       // Upload what we've got so far in the collection
1381       let doUpload = Utils.bind2(this, function(desc) {
1382         this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
1383                        " records");
1384         let resp = up.post();
1385         if (!resp.success) {
1386           this._log.debug("Uploading records failed: " + resp);
1387           resp.failureCode = ENGINE_UPLOAD_FAIL;
1388           throw resp;
1389         }
1391         // Update server timestamp from the upload.
1392         let modified = resp.headers["x-weave-timestamp"];
1393         if (modified > this.lastSync)
1394           this.lastSync = modified;
1396         let failed_ids = Object.keys(resp.obj.failed);
1397         if (failed_ids.length)
1398           this._log.debug("Records that will be uploaded again because "
1399                           + "the server couldn't store them: "
1400                           + failed_ids.join(", "));
1402         // Clear successfully uploaded objects.
1403         for each (let id in resp.obj.success) {
1404           delete this._modified[id];
1405         }
1407         up.clearRecords();
1408       });
1410       for each (let id in modifiedIDs) {
1411         try {
1412           let out = this._createRecord(id);
1413           if (this._log.level <= Log.Level.Trace)
1414             this._log.trace("Outgoing: " + out);
1416           out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
1417           up.pushData(out);
1418         }
1419         catch(ex) {
1420           this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
1421         }
1423         // Partial upload
1424         if ((++count % MAX_UPLOAD_RECORDS) == 0)
1425           doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
1427         this._store._sleep(0);
1428       }
1430       // Final upload
1431       if (count % MAX_UPLOAD_RECORDS > 0)
1432         doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
1433     }
1434   },
1436   // Any cleanup necessary.
1437   // Save the current snapshot so as to calculate changes at next sync
1438   _syncFinish: function () {
1439     this._log.trace("Finishing up sync");
1440     this._tracker.resetScore();
1442     let doDelete = Utils.bind2(this, function(key, val) {
1443       let coll = new Collection(this.engineURL, this._recordObj, this.service);
1444       coll[key] = val;
1445       coll.delete();
1446     });
1448     for (let [key, val] in Iterator(this._delete)) {
1449       // Remove the key for future uses
1450       delete this._delete[key];
1452       // Send a simple delete for the property
1453       if (key != "ids" || val.length <= 100)
1454         doDelete(key, val);
1455       else {
1456         // For many ids, split into chunks of at most 100
1457         while (val.length > 0) {
1458           doDelete(key, val.slice(0, 100));
1459           val = val.slice(100);
1460         }
1461       }
1462     }
1463   },
1465   _syncCleanup: function () {
1466     if (!this._modified) {
1467       return;
1468     }
1470     // Mark failed WBOs as changed again so they are reuploaded next time.
1471     for (let [id, when] in Iterator(this._modified)) {
1472       this._tracker.addChangedID(id, when);
1473     }
1474     this._modified = {};
1475   },
1477   _sync: function () {
1478     try {
1479       this._syncStartup();
1480       Observers.notify("weave:engine:sync:status", "process-incoming");
1481       this._processIncoming();
1482       Observers.notify("weave:engine:sync:status", "upload-outgoing");
1483       this._uploadOutgoing();
1484       this._syncFinish();
1485     } finally {
1486       this._syncCleanup();
1487     }
1488   },
1490   canDecrypt: function () {
1491     // Report failure even if there's nothing to decrypt
1492     let canDecrypt = false;
1494     // Fetch the most recently uploaded record and try to decrypt it
1495     let test = new Collection(this.engineURL, this._recordObj, this.service);
1496     test.limit = 1;
1497     test.sort = "newest";
1498     test.full = true;
1500     let key = this.service.collectionKeys.keyForCollection(this.name);
1501     test.recordHandler = function recordHandler(record) {
1502       record.decrypt(key);
1503       canDecrypt = true;
1504     }.bind(this);
1506     // Any failure fetching/decrypting will just result in false
1507     try {
1508       this._log.trace("Trying to decrypt a record from the server..");
1509       test.get();
1510     }
1511     catch(ex) {
1512       this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
1513     }
1515     return canDecrypt;
1516   },
1518   _resetClient: function () {
1519     this.resetLastSync();
1520     this.previousFailed = [];
1521     this.toFetch = [];
1522   },
1524   wipeServer: function () {
1525     let response = this.service.resource(this.engineURL).delete();
1526     if (response.status != 200 && response.status != 404) {
1527       throw response;
1528     }
1529     this._resetClient();
1530   },
1532   removeClientData: function () {
1533     // Implement this method in engines that store client specific data
1534     // on the server.
1535   },
1537   /*
1538    * Decide on (and partially effect) an error-handling strategy.
1539    *
1540    * Asks the Service to respond to an HMAC error, which might result in keys
1541    * being downloaded. That call returns true if an action which might allow a
1542    * retry to occur.
1543    *
1544    * If `mayRetry` is truthy, and the Service suggests a retry,
1545    * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
1546    * kRecoveryStrategy.error.
1547    *
1548    * Subclasses of SyncEngine can override this method to allow for different
1549    * behavior -- e.g., to delete and ignore erroneous entries.
1550    *
1551    * All return values will be part of the kRecoveryStrategy enumeration.
1552    */
1553   handleHMACMismatch: function (item, mayRetry) {
1554     // By default we either try again, or bail out noisily.
1555     return (this.service.handleHMACEvent() && mayRetry) ?
1556            SyncEngine.kRecoveryStrategy.retry :
1557            SyncEngine.kRecoveryStrategy.error;
1558   }