Bug 1787269 [wpt PR 35624] - Further refine STP download regexs, a=testonly
[gecko.git] / dom / push / PushServiceWebSocket.jsm
blob7e9805ba7410441f519c4fd8e9dddc8f0c0ab8dd
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 "use strict";
7 const { XPCOMUtils } = ChromeUtils.importESModule(
8   "resource://gre/modules/XPCOMUtils.sys.mjs"
9 );
11 const { PushDB } = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
12 const { PushRecord } = ChromeUtils.import(
13   "resource://gre/modules/PushRecord.jsm"
15 const { PushCrypto } = ChromeUtils.import(
16   "resource://gre/modules/PushCrypto.jsm"
19 const lazy = {};
21 ChromeUtils.defineModuleGetter(
22   lazy,
23   "pushBroadcastService",
24   "resource://gre/modules/PushBroadcastService.jsm"
26 ChromeUtils.defineModuleGetter(
27   lazy,
28   "ObjectUtils",
29   "resource://gre/modules/ObjectUtils.jsm"
32 const kPUSHWSDB_DB_NAME = "pushapi";
33 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
34 const kPUSHWSDB_STORE_NAME = "pushapi";
36 // WebSocket close code sent by the server to indicate that the client should
37 // not automatically reconnect.
38 const kBACKOFF_WS_STATUS_CODE = 4774;
40 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
41 // included in request payloads.
42 const kACK_STATUS_TO_CODE = {
43   [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
44   [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
45   [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
48 const kUNREGISTER_REASON_TO_CODE = {
49   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
50   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
51   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
54 const kDELIVERY_REASON_TO_CODE = {
55   [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
56   [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
57   [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
60 const prefs = Services.prefs.getBranch("dom.push.");
62 const EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
64 XPCOMUtils.defineLazyGetter(lazy, "console", () => {
65   let { ConsoleAPI } = ChromeUtils.import("resource://gre/modules/Console.jsm");
66   return new ConsoleAPI({
67     maxLogLevelPref: "dom.push.loglevel",
68     prefix: "PushServiceWebSocket",
69   });
70 });
72 /**
73  * A proxy between the PushService and the WebSocket. The listener is used so
74  * that the PushService can silence messages from the WebSocket by setting
75  * PushWebSocketListener._pushService to null. This is required because
76  * a WebSocket can continue to send messages or errors after it has been
77  * closed but the PushService may not be interested in these. It's easier to
78  * stop listening than to have checks at specific points.
79  */
80 var PushWebSocketListener = function(pushService) {
81   this._pushService = pushService;
84 PushWebSocketListener.prototype = {
85   onStart(context) {
86     if (!this._pushService) {
87       return;
88     }
89     this._pushService._wsOnStart(context);
90   },
92   onStop(context, statusCode) {
93     if (!this._pushService) {
94       return;
95     }
96     this._pushService._wsOnStop(context, statusCode);
97   },
99   onAcknowledge(context, size) {
100     // EMPTY
101   },
103   onBinaryMessageAvailable(context, message) {
104     // EMPTY
105   },
107   onMessageAvailable(context, message) {
108     if (!this._pushService) {
109       return;
110     }
111     this._pushService._wsOnMessageAvailable(context, message);
112   },
114   onServerClose(context, aStatusCode, aReason) {
115     if (!this._pushService) {
116       return;
117     }
118     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
119   },
122 // websocket states
123 // websocket is off
124 const STATE_SHUT_DOWN = 0;
125 // Websocket has been opened on client side, waiting for successful open.
126 // (_wsOnStart)
127 const STATE_WAITING_FOR_WS_START = 1;
128 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
129 const STATE_WAITING_FOR_HELLO = 2;
130 // Websocket operational, handshake completed, begin protocol messaging.
131 const STATE_READY = 3;
133 var PushServiceWebSocket = {
134   _mainPushService: null,
135   _serverURI: null,
136   _currentlyRegistering: new Set(),
138   newPushDB() {
139     return new PushDB(
140       kPUSHWSDB_DB_NAME,
141       kPUSHWSDB_DB_VERSION,
142       kPUSHWSDB_STORE_NAME,
143       "channelID",
144       PushRecordWebSocket
145     );
146   },
148   disconnect() {
149     this._shutdownWS();
150   },
152   observe(aSubject, aTopic, aData) {
153     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
154       this._onUAIDChanged();
155     } else if (aTopic == "timer-callback") {
156       this._onTimerFired(aSubject);
157     }
158   },
160   /**
161    * Handles a UAID change. Unlike reconnects, we cancel all pending requests
162    * after disconnecting. Existing subscriptions stored in IndexedDB will be
163    * dropped on reconnect.
164    */
165   _onUAIDChanged() {
166     lazy.console.debug("onUAIDChanged()");
168     this._shutdownWS();
169     this._startBackoffTimer();
170   },
172   /** Handles a ping, backoff, or request timeout timer event. */
173   _onTimerFired(timer) {
174     lazy.console.debug("onTimerFired()");
176     if (timer == this._pingTimer) {
177       this._sendPing();
178       return;
179     }
181     if (timer == this._backoffTimer) {
182       lazy.console.debug("onTimerFired: Reconnecting after backoff");
183       this._beginWSSetup();
184       return;
185     }
187     if (timer == this._requestTimeoutTimer) {
188       this._timeOutRequests();
189     }
190   },
192   /**
193    * Sends a ping to the server. Bypasses the request queue, but starts the
194    * request timeout timer. If the socket is already closed, or the server
195    * does not respond within the timeout, the client will reconnect.
196    */
197   _sendPing() {
198     lazy.console.debug("sendPing()");
200     this._startRequestTimeoutTimer();
201     try {
202       this._wsSendMessage({});
203       this._lastPingTime = Date.now();
204     } catch (e) {
205       lazy.console.debug("sendPing: Error sending ping", e);
206       this._reconnect();
207     }
208   },
210   /** Times out any pending requests. */
211   _timeOutRequests() {
212     lazy.console.debug("timeOutRequests()");
214     if (!this._hasPendingRequests()) {
215       // Cancel the repeating timer and exit early if we aren't waiting for
216       // pongs or requests.
217       this._requestTimeoutTimer.cancel();
218       return;
219     }
221     let now = Date.now();
223     // Set to true if at least one request timed out, or we're still waiting
224     // for a pong after the request timeout.
225     let requestTimedOut = false;
227     if (
228       this._lastPingTime > 0 &&
229       now - this._lastPingTime > this._requestTimeout
230     ) {
231       lazy.console.debug("timeOutRequests: Did not receive pong in time");
232       requestTimedOut = true;
233     } else {
234       for (let [key, request] of this._pendingRequests) {
235         let duration = now - request.ctime;
236         // If any of the registration requests time out, all the ones after it
237         // also made to fail, since we are going to be disconnecting the
238         // socket.
239         requestTimedOut |= duration > this._requestTimeout;
240         if (requestTimedOut) {
241           request.reject(new Error("Request timed out: " + key));
242           this._pendingRequests.delete(key);
243         }
244       }
245     }
247     // The most likely reason for a pong or registration request timing out is
248     // that the socket has disconnected. Best to reconnect.
249     if (requestTimedOut) {
250       this._reconnect();
251     }
252   },
254   get _UAID() {
255     return prefs.getStringPref("userAgentID");
256   },
258   set _UAID(newID) {
259     if (typeof newID !== "string") {
260       lazy.console.warn(
261         "Got invalid, non-string UAID",
262         newID,
263         "Not updating userAgentID"
264       );
265       return;
266     }
267     lazy.console.debug("New _UAID", newID);
268     prefs.setStringPref("userAgentID", newID);
269   },
271   _ws: null,
272   _pendingRequests: new Map(),
273   _currentState: STATE_SHUT_DOWN,
274   _requestTimeout: 0,
275   _requestTimeoutTimer: null,
276   _retryFailCount: 0,
278   /**
279    * According to the WS spec, servers should immediately close the underlying
280    * TCP connection after they close a WebSocket. This causes wsOnStop to be
281    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
282    * WebSocket up, it should try to reconnect. But if the server closes the
283    * WebSocket because it wants the client to back off, then the client
284    * shouldn't re-establish the connection. If the server sends the backoff
285    * close code, this field will be set to true in wsOnServerClose. It is
286    * checked in wsOnStop.
287    */
288   _skipReconnect: false,
290   /** Indicates whether the server supports Web Push-style message delivery. */
291   _dataEnabled: false,
293   /**
294    * The last time the client sent a ping to the server. If non-zero, keeps the
295    * request timeout timer active. Reset to zero when the server responds with
296    * a pong or pending messages.
297    */
298   _lastPingTime: 0,
300   /**
301    * A one-shot timer used to ping the server, to avoid timing out idle
302    * connections. Reset to the ping interval on each incoming message.
303    */
304   _pingTimer: null,
306   /** A one-shot timer fired after the reconnect backoff period. */
307   _backoffTimer: null,
309   /**
310    * Sends a message to the Push Server through an open websocket.
311    * typeof(msg) shall be an object
312    */
313   _wsSendMessage(msg) {
314     if (!this._ws) {
315       lazy.console.warn(
316         "wsSendMessage: No WebSocket initialized.",
317         "Cannot send a message"
318       );
319       return;
320     }
321     msg = JSON.stringify(msg);
322     lazy.console.debug("wsSendMessage: Sending message", msg);
323     this._ws.sendMsg(msg);
324   },
326   init(options, mainPushService, serverURI) {
327     lazy.console.debug("init()");
329     this._mainPushService = mainPushService;
330     this._serverURI = serverURI;
331     // Filled in at connect() time
332     this._broadcastListeners = null;
334     // Override the default WebSocket factory function. The returned object
335     // must be null or satisfy the nsIWebSocketChannel interface. Used by
336     // the tests to provide a mock WebSocket implementation.
337     if (options.makeWebSocket) {
338       this._makeWebSocket = options.makeWebSocket;
339     }
341     this._requestTimeout = prefs.getIntPref("requestTimeout");
343     return Promise.resolve();
344   },
346   _reconnect() {
347     lazy.console.debug("reconnect()");
348     this._shutdownWS(false);
349     this._startBackoffTimer();
350   },
352   _shutdownWS(shouldCancelPending = true) {
353     lazy.console.debug("shutdownWS()");
355     if (this._currentState == STATE_READY) {
356       prefs.removeObserver("userAgentID", this);
357     }
359     this._currentState = STATE_SHUT_DOWN;
360     this._skipReconnect = false;
362     if (this._wsListener) {
363       this._wsListener._pushService = null;
364     }
365     try {
366       this._ws.close(0, null);
367     } catch (e) {}
368     this._ws = null;
370     this._lastPingTime = 0;
372     if (this._pingTimer) {
373       this._pingTimer.cancel();
374     }
376     if (shouldCancelPending) {
377       this._cancelPendingRequests();
378     }
380     if (this._notifyRequestQueue) {
381       this._notifyRequestQueue();
382       this._notifyRequestQueue = null;
383     }
384   },
386   uninit() {
387     // All pending requests (ideally none) are dropped at this point. We
388     // shouldn't have any applications performing registration/unregistration
389     // or receiving notifications.
390     this._shutdownWS();
392     if (this._backoffTimer) {
393       this._backoffTimer.cancel();
394     }
395     if (this._requestTimeoutTimer) {
396       this._requestTimeoutTimer.cancel();
397     }
399     this._mainPushService = null;
401     this._dataEnabled = false;
402   },
404   /**
405    * How retries work: If the WS is closed due to a socket error,
406    * _startBackoffTimer() is called. The retry timer is started and when
407    * it times out, beginWSSetup() is called again.
408    *
409    * If we are in the middle of a timeout (i.e. waiting), but
410    * a register/unregister is called, we don't want to wait around anymore.
411    * _sendRequest will automatically call beginWSSetup(), which will cancel the
412    * timer. In addition since the state will have changed, even if a pending
413    * timer event comes in (because the timer fired the event before it was
414    * cancelled), so the connection won't be reset.
415    */
416   _startBackoffTimer() {
417     lazy.console.debug("startBackoffTimer()");
419     // Calculate new timeout, but cap it to pingInterval.
420     let retryTimeout =
421       prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
422     retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
424     this._retryFailCount++;
426     lazy.console.debug(
427       "startBackoffTimer: Retry in",
428       retryTimeout,
429       "Try number",
430       this._retryFailCount
431     );
433     if (!this._backoffTimer) {
434       this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
435         Ci.nsITimer
436       );
437     }
438     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
439   },
441   /** Indicates whether we're waiting for pongs or requests. */
442   _hasPendingRequests() {
443     return this._lastPingTime > 0 || this._pendingRequests.size > 0;
444   },
446   /**
447    * Starts the request timeout timer unless we're already waiting for a pong
448    * or register request.
449    */
450   _startRequestTimeoutTimer() {
451     if (this._hasPendingRequests()) {
452       return;
453     }
454     if (!this._requestTimeoutTimer) {
455       this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
456         Ci.nsITimer
457       );
458     }
459     this._requestTimeoutTimer.init(
460       this,
461       this._requestTimeout,
462       Ci.nsITimer.TYPE_REPEATING_SLACK
463     );
464   },
466   /** Starts or resets the ping timer. */
467   _startPingTimer() {
468     if (!this._pingTimer) {
469       this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
470     }
471     this._pingTimer.init(
472       this,
473       prefs.getIntPref("pingInterval"),
474       Ci.nsITimer.TYPE_ONE_SHOT
475     );
476   },
478   _makeWebSocket(uri) {
479     if (!prefs.getBoolPref("connection.enabled")) {
480       lazy.console.warn(
481         "makeWebSocket: connection.enabled is not set to true.",
482         "Aborting."
483       );
484       return null;
485     }
486     if (Services.io.offline) {
487       lazy.console.warn("makeWebSocket: Network is offline.");
488       return null;
489     }
490     let contractId =
491       uri.scheme == "ws"
492         ? "@mozilla.org/network/protocol;1?name=ws"
493         : "@mozilla.org/network/protocol;1?name=wss";
494     let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
496     socket.initLoadInfo(
497       null, // aLoadingNode
498       Services.scriptSecurityManager.getSystemPrincipal(),
499       null, // aTriggeringPrincipal
500       Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
501       Ci.nsIContentPolicy.TYPE_WEBSOCKET
502     );
503     // Allow deprecated HTTP request from SystemPrincipal
504     socket.loadInfo.allowDeprecatedSystemRequests = true;
506     return socket;
507   },
509   _beginWSSetup() {
510     lazy.console.debug("beginWSSetup()");
511     if (this._currentState != STATE_SHUT_DOWN) {
512       lazy.console.error(
513         "_beginWSSetup: Not in shutdown state! Current state",
514         this._currentState
515       );
516       return;
517     }
519     // Stop any pending reconnects scheduled for the near future.
520     if (this._backoffTimer) {
521       this._backoffTimer.cancel();
522     }
524     let uri = this._serverURI;
525     if (!uri) {
526       return;
527     }
528     let socket = this._makeWebSocket(uri);
529     if (!socket) {
530       return;
531     }
532     this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
534     lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
535     this._wsListener = new PushWebSocketListener(this);
536     this._ws.protocol = "push-notification";
538     try {
539       // Grab a wakelock before we open the socket to ensure we don't go to
540       // sleep before connection the is opened.
541       this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
542       this._currentState = STATE_WAITING_FOR_WS_START;
543     } catch (e) {
544       lazy.console.error(
545         "beginWSSetup: Error opening websocket.",
546         "asyncOpen failed",
547         e
548       );
549       this._reconnect();
550     }
551   },
553   connect(broadcastListeners) {
554     lazy.console.debug("connect()", broadcastListeners);
555     this._broadcastListeners = broadcastListeners;
556     this._beginWSSetup();
557   },
559   isConnected() {
560     return !!this._ws;
561   },
563   /**
564    * Protocol handler invoked by server message.
565    */
566   _handleHelloReply(reply) {
567     lazy.console.debug("handleHelloReply()");
568     if (this._currentState != STATE_WAITING_FOR_HELLO) {
569       lazy.console.error(
570         "handleHelloReply: Unexpected state",
571         this._currentState,
572         "(expected STATE_WAITING_FOR_HELLO)"
573       );
574       this._shutdownWS();
575       return;
576     }
578     if (typeof reply.uaid !== "string") {
579       lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
580       this._shutdownWS();
581       return;
582     }
584     if (reply.uaid === "") {
585       lazy.console.error("handleHelloReply: Received empty UAID");
586       this._shutdownWS();
587       return;
588     }
590     // To avoid sticking extra large values sent by an evil server into prefs.
591     if (reply.uaid.length > 128) {
592       lazy.console.error(
593         "handleHelloReply: UAID received from server was too long",
594         reply.uaid
595       );
596       this._shutdownWS();
597       return;
598     }
600     let sendRequests = () => {
601       if (this._notifyRequestQueue) {
602         this._notifyRequestQueue();
603         this._notifyRequestQueue = null;
604       }
605       this._sendPendingRequests();
606     };
608     function finishHandshake() {
609       this._UAID = reply.uaid;
610       this._currentState = STATE_READY;
611       prefs.addObserver("userAgentID", this);
613       // Handle broadcasts received in response to the "hello" message.
614       if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
615         // The reply isn't technically a broadcast message, but it has
616         // the shape of a broadcast message (it has a broadcasts field).
617         const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
618         this._mainPushService.receivedBroadcastMessage(reply, context);
619       }
621       this._dataEnabled = !!reply.use_webpush;
622       if (this._dataEnabled) {
623         this._mainPushService
624           .getAllUnexpired()
625           .then(records =>
626             Promise.all(
627               records.map(record =>
628                 this._mainPushService.ensureCrypto(record).catch(error => {
629                   lazy.console.error(
630                     "finishHandshake: Error updating record",
631                     record.keyID,
632                     error
633                   );
634                 })
635               )
636             )
637           )
638           .then(sendRequests);
639       } else {
640         sendRequests();
641       }
642     }
644     // By this point we've got a UAID from the server that we are ready to
645     // accept.
646     //
647     // We unconditionally drop all existing registrations and notify service
648     // workers if we receive a new UAID. This ensures we expunge all stale
649     // registrations if the `userAgentID` pref is reset.
650     if (this._UAID != reply.uaid) {
651       lazy.console.debug("handleHelloReply: Received new UAID");
653       this._mainPushService
654         .dropUnexpiredRegistrations()
655         .then(finishHandshake.bind(this));
657       return;
658     }
660     // otherwise we are good to go
661     finishHandshake.bind(this)();
662   },
664   /**
665    * Protocol handler invoked by server message.
666    */
667   _handleRegisterReply(reply) {
668     lazy.console.debug("handleRegisterReply()");
670     let tmp = this._takeRequestForReply(reply);
671     if (!tmp) {
672       return;
673     }
675     if (reply.status == 200) {
676       try {
677         Services.io.newURI(reply.pushEndpoint);
678       } catch (e) {
679         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
680         return;
681       }
683       let record = new PushRecordWebSocket({
684         channelID: reply.channelID,
685         pushEndpoint: reply.pushEndpoint,
686         scope: tmp.record.scope,
687         originAttributes: tmp.record.originAttributes,
688         version: null,
689         systemRecord: tmp.record.systemRecord,
690         appServerKey: tmp.record.appServerKey,
691         ctime: Date.now(),
692       });
693       tmp.resolve(record);
694     } else {
695       lazy.console.error(
696         "handleRegisterReply: Unexpected server response",
697         reply
698       );
699       tmp.reject(
700         new Error("Wrong status code for register reply: " + reply.status)
701       );
702     }
703   },
705   _handleUnregisterReply(reply) {
706     lazy.console.debug("handleUnregisterReply()");
708     let request = this._takeRequestForReply(reply);
709     if (!request) {
710       return;
711     }
713     let success = reply.status === 200;
714     request.resolve(success);
715   },
717   _handleDataUpdate(update) {
718     let promise;
719     if (typeof update.channelID != "string") {
720       lazy.console.warn(
721         "handleDataUpdate: Discarding update without channel ID",
722         update
723       );
724       return;
725     }
726     function updateRecord(record) {
727       // Ignore messages that we've already processed. This can happen if the
728       // connection drops between notifying the service worker and acking the
729       // the message. In that case, the server will re-send the message on
730       // reconnect.
731       if (record.hasRecentMessageID(update.version)) {
732         lazy.console.warn(
733           "handleDataUpdate: Ignoring duplicate message",
734           update.version
735         );
736         return null;
737       }
738       record.noteRecentMessageID(update.version);
739       return record;
740     }
741     if (typeof update.data != "string") {
742       promise = this._mainPushService.receivedPushMessage(
743         update.channelID,
744         update.version,
745         null,
746         null,
747         updateRecord
748       );
749     } else {
750       let message = ChromeUtils.base64URLDecode(update.data, {
751         // The Push server may append padding.
752         padding: "ignore",
753       });
754       promise = this._mainPushService.receivedPushMessage(
755         update.channelID,
756         update.version,
757         update.headers,
758         message,
759         updateRecord
760       );
761     }
762     promise
763       .then(
764         status => {
765           this._sendAck(update.channelID, update.version, status);
766         },
767         err => {
768           lazy.console.error(
769             "handleDataUpdate: Error delivering message",
770             update,
771             err
772           );
773           this._sendAck(
774             update.channelID,
775             update.version,
776             Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
777           );
778         }
779       )
780       .catch(err => {
781         lazy.console.error(
782           "handleDataUpdate: Error acknowledging message",
783           update,
784           err
785         );
786       });
787   },
789   /**
790    * Protocol handler invoked by server message.
791    */
792   _handleNotificationReply(reply) {
793     lazy.console.debug("handleNotificationReply()");
794     if (this._dataEnabled) {
795       this._handleDataUpdate(reply);
796       return;
797     }
799     if (typeof reply.updates !== "object") {
800       lazy.console.warn(
801         "handleNotificationReply: Missing updates",
802         reply.updates
803       );
804       return;
805     }
807     lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
808     for (let i = 0; i < reply.updates.length; i++) {
809       let update = reply.updates[i];
810       lazy.console.debug("handleNotificationReply: Handling update", update);
811       if (typeof update.channelID !== "string") {
812         lazy.console.debug(
813           "handleNotificationReply: Invalid update at index",
814           i,
815           update
816         );
817         continue;
818       }
820       if (update.version === undefined) {
821         lazy.console.debug("handleNotificationReply: Missing version", update);
822         continue;
823       }
825       let version = update.version;
827       if (typeof version === "string") {
828         version = parseInt(version, 10);
829       }
831       if (typeof version === "number" && version >= 0) {
832         // FIXME(nsm): this relies on app update notification being infallible!
833         // eventually fix this
834         this._receivedUpdate(update.channelID, version);
835       }
836     }
837   },
839   _handleBroadcastReply(reply) {
840     let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
841     // Check if this reply is the result of registration.
842     for (const id of Object.keys(reply.broadcasts)) {
843       const wasRegistering = this._currentlyRegistering.delete(id);
844       if (wasRegistering) {
845         // If we get multiple broadcasts and only one is "registering",
846         // then we consider the phase to be REGISTER for all of them.
847         // It is acceptable since registrations do not happen so often,
848         // and are all very likely to occur soon after browser startup.
849         phase = lazy.pushBroadcastService.PHASES.REGISTER;
850       }
851     }
852     const context = { phase };
853     this._mainPushService.receivedBroadcastMessage(reply, context);
854   },
856   reportDeliveryError(messageID, reason) {
857     lazy.console.debug("reportDeliveryError()");
858     let code = kDELIVERY_REASON_TO_CODE[reason];
859     if (!code) {
860       throw new Error("Invalid delivery error reason");
861     }
862     let data = { messageType: "nack", version: messageID, code };
863     this._queueRequest(data);
864   },
866   _sendAck(channelID, version, status) {
867     lazy.console.debug("sendAck()");
868     let code = kACK_STATUS_TO_CODE[status];
869     if (!code) {
870       throw new Error("Invalid ack status");
871     }
872     let data = { messageType: "ack", updates: [{ channelID, version, code }] };
873     this._queueRequest(data);
874   },
876   _generateID() {
877     // generateUUID() gives a UUID surrounded by {...}, slice them off.
878     return Services.uuid
879       .generateUUID()
880       .toString()
881       .slice(1, -1);
882   },
884   register(record) {
885     lazy.console.debug("register() ", record);
887     let data = { channelID: this._generateID(), messageType: "register" };
889     if (record.appServerKey) {
890       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
891         // The Push server requires padding.
892         pad: true,
893       });
894     }
896     return this._sendRequestForReply(record, data).then(record => {
897       if (!this._dataEnabled) {
898         return record;
899       }
900       return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
901         record.p256dhPublicKey = publicKey;
902         record.p256dhPrivateKey = privateKey;
903         record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
904         return record;
905       });
906     });
907   },
909   unregister(record, reason) {
910     lazy.console.debug("unregister() ", record, reason);
912     return Promise.resolve().then(_ => {
913       let code = kUNREGISTER_REASON_TO_CODE[reason];
914       if (!code) {
915         throw new Error("Invalid unregister reason");
916       }
917       let data = {
918         channelID: record.channelID,
919         messageType: "unregister",
920         code,
921       };
923       return this._sendRequestForReply(record, data);
924     });
925   },
927   _queueStart: Promise.resolve(),
928   _notifyRequestQueue: null,
929   _queue: null,
930   _enqueue(op) {
931     lazy.console.debug("enqueue()");
932     if (!this._queue) {
933       this._queue = this._queueStart;
934     }
935     this._queue = this._queue.then(op).catch(_ => {});
936   },
938   /** Sends a request to the server. */
939   _send(data) {
940     if (this._currentState != STATE_READY) {
941       lazy.console.warn(
942         "send: Unexpected state; ignoring message",
943         this._currentState
944       );
945       return;
946     }
947     if (!this._requestHasReply(data)) {
948       this._wsSendMessage(data);
949       return;
950     }
951     // If we're expecting a reply, check that we haven't cancelled the request.
952     let key = this._makePendingRequestKey(data);
953     if (!this._pendingRequests.has(key)) {
954       lazy.console.log("send: Request cancelled; ignoring message", key);
955       return;
956     }
957     this._wsSendMessage(data);
958   },
960   /** Indicates whether a request has a corresponding reply from the server. */
961   _requestHasReply(data) {
962     return data.messageType == "register" || data.messageType == "unregister";
963   },
965   /**
966    * Sends all pending requests that expect replies. Called after the connection
967    * is established and the handshake is complete.
968    */
969   _sendPendingRequests() {
970     this._enqueue(_ => {
971       for (let request of this._pendingRequests.values()) {
972         this._send(request.data);
973       }
974     });
975   },
977   /** Queues an outgoing request, establishing a connection if necessary. */
978   _queueRequest(data) {
979     lazy.console.debug("queueRequest()", data);
981     if (this._currentState == STATE_READY) {
982       // If we're ready, no need to queue; just send the request.
983       this._send(data);
984       return;
985     }
987     // Otherwise, we're still setting up. If we don't have a request queue,
988     // make one now.
989     if (!this._notifyRequestQueue) {
990       let promise = new Promise((resolve, reject) => {
991         this._notifyRequestQueue = resolve;
992       });
993       this._enqueue(_ => promise);
994     }
996     let isRequest = this._requestHasReply(data);
997     if (!isRequest) {
998       // Don't queue requests, since they're stored in `_pendingRequests`, and
999       // `_sendPendingRequests` will send them after reconnecting. Without this
1000       // check, we'd send requests twice.
1001       this._enqueue(_ => this._send(data));
1002     }
1004     if (!this._ws) {
1005       // This will end up calling notifyRequestQueue().
1006       this._beginWSSetup();
1007       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
1008       // not be call.
1009       if (!this._ws && this._notifyRequestQueue) {
1010         this._notifyRequestQueue();
1011         this._notifyRequestQueue = null;
1012       }
1013     }
1014   },
1016   _receivedUpdate(aChannelID, aLatestVersion) {
1017     lazy.console.debug(
1018       "receivedUpdate: Updating",
1019       aChannelID,
1020       "->",
1021       aLatestVersion
1022     );
1024     this._mainPushService
1025       .receivedPushMessage(aChannelID, "", null, null, record => {
1026         if (record.version === null || record.version < aLatestVersion) {
1027           lazy.console.debug(
1028             "receivedUpdate: Version changed for",
1029             aChannelID,
1030             aLatestVersion
1031           );
1032           record.version = aLatestVersion;
1033           return record;
1034         }
1035         lazy.console.debug(
1036           "receivedUpdate: No significant version change for",
1037           aChannelID,
1038           aLatestVersion
1039         );
1040         return null;
1041       })
1042       .then(status => {
1043         this._sendAck(aChannelID, aLatestVersion, status);
1044       })
1045       .catch(err => {
1046         lazy.console.error(
1047           "receivedUpdate: Error acknowledging message",
1048           aChannelID,
1049           aLatestVersion,
1050           err
1051         );
1052       });
1053   },
1055   // begin Push protocol handshake
1056   _wsOnStart(context) {
1057     lazy.console.debug("wsOnStart()");
1059     if (this._currentState != STATE_WAITING_FOR_WS_START) {
1060       lazy.console.error(
1061         "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1062         "state",
1063         this._currentState,
1064         "Skipping"
1065       );
1066       return;
1067     }
1069     this._mainPushService
1070       .getAllUnexpired()
1071       .then(
1072         records => this._sendHello(records),
1073         err => {
1074           lazy.console.warn(
1075             "Error fetching existing records before handshake; assuming none",
1076             err
1077           );
1078           this._sendHello([]);
1079         }
1080       )
1081       .catch(err => {
1082         // If we failed to send the handshake, back off and reconnect.
1083         lazy.console.warn("Failed to send handshake; reconnecting", err);
1084         this._reconnect();
1085       });
1086   },
1088   /**
1089    * Sends a `hello` handshake to the server.
1090    *
1091    * @param {Array<PushRecordWebSocket>} An array of records for existing
1092    *        subscriptions, used to determine whether to rotate our UAID.
1093    */
1094   _sendHello(records) {
1095     let data = {
1096       messageType: "hello",
1097       broadcasts: this._broadcastListeners,
1098       use_webpush: true,
1099     };
1101     if (records.length && this._UAID) {
1102       // Only send our UAID if we have existing push subscriptions, to
1103       // avoid tying a persistent identifier to the connection (bug
1104       // 1617136). The push server will issue our client a new UAID in
1105       // the `hello` response, which we'll store until either the next
1106       // time we reconnect, or the user subscribes to push. Once we have a
1107       // push subscription, we'll stop rotating the UAID when we connect,
1108       // so that we can receive push messages for them.
1109       data.uaid = this._UAID;
1110     }
1112     this._wsSendMessage(data);
1113     this._currentState = STATE_WAITING_FOR_HELLO;
1114   },
1116   /**
1117    * This statusCode is not the websocket protocol status code, but the TCP
1118    * connection close status code.
1119    *
1120    * If we do not explicitly call ws.close() then statusCode is always
1121    * NS_BASE_STREAM_CLOSED, even on a successful close.
1122    */
1123   _wsOnStop(context, statusCode) {
1124     lazy.console.debug("wsOnStop()");
1126     if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1127       lazy.console.debug(
1128         "wsOnStop: Reconnecting after socket error",
1129         statusCode
1130       );
1131       this._reconnect();
1132       return;
1133     }
1135     this._shutdownWS();
1136   },
1138   _wsOnMessageAvailable(context, message) {
1139     lazy.console.debug("wsOnMessageAvailable()", message);
1141     // Clearing the last ping time indicates we're no longer waiting for a pong.
1142     this._lastPingTime = 0;
1144     let reply;
1145     try {
1146       reply = JSON.parse(message);
1147     } catch (e) {
1148       lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1149       return;
1150     }
1152     // If we receive a message, we know the connection succeeded. Reset the
1153     // connection attempt and ping interval counters.
1154     this._retryFailCount = 0;
1156     let doNotHandle = false;
1157     if (
1158       message === "{}" ||
1159       reply.messageType === undefined ||
1160       reply.messageType === "ping" ||
1161       typeof reply.messageType != "string"
1162     ) {
1163       lazy.console.debug("wsOnMessageAvailable: Pong received");
1164       doNotHandle = true;
1165     }
1167     // Reset the ping timer.  Note: This path is executed at every step of the
1168     // handshake, so this timer does not need to be set explicitly at startup.
1169     this._startPingTimer();
1171     // If it is a ping, do not handle the message.
1172     if (doNotHandle) {
1173       return;
1174     }
1176     // A whitelist of protocol handlers. Add to these if new messages are added
1177     // in the protocol.
1178     let handlers = [
1179       "Hello",
1180       "Register",
1181       "Unregister",
1182       "Notification",
1183       "Broadcast",
1184     ];
1186     // Build up the handler name to call from messageType.
1187     // e.g. messageType == "register" -> _handleRegisterReply.
1188     let handlerName =
1189       reply.messageType[0].toUpperCase() +
1190       reply.messageType.slice(1).toLowerCase();
1192     if (!handlers.includes(handlerName)) {
1193       lazy.console.warn(
1194         "wsOnMessageAvailable: No whitelisted handler",
1195         handlerName,
1196         "for message",
1197         reply.messageType
1198       );
1199       return;
1200     }
1202     let handler = "_handle" + handlerName + "Reply";
1204     if (typeof this[handler] !== "function") {
1205       lazy.console.warn(
1206         "wsOnMessageAvailable: Handler",
1207         handler,
1208         "whitelisted but not implemented"
1209       );
1210       return;
1211     }
1213     this[handler](reply);
1214   },
1216   /**
1217    * The websocket should never be closed. Since we don't call ws.close(),
1218    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1219    * function), which calls reconnect and re-establishes the WebSocket
1220    * connection.
1221    *
1222    * If the server requested that we back off, we won't reconnect until the
1223    * next network state change event, or until we need to send a new register
1224    * request.
1225    */
1226   _wsOnServerClose(context, aStatusCode, aReason) {
1227     lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
1229     if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1230       lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
1231       this._skipReconnect = true;
1232     }
1233   },
1235   /**
1236    * Rejects all pending register requests with errors.
1237    */
1238   _cancelPendingRequests() {
1239     for (let request of this._pendingRequests.values()) {
1240       request.reject(new Error("Request aborted"));
1241     }
1242     this._pendingRequests.clear();
1243   },
1245   /** Creates a case-insensitive map key for a request that expects a reply. */
1246   _makePendingRequestKey(data) {
1247     return (data.messageType + "|" + data.channelID).toLowerCase();
1248   },
1250   /** Sends a request and waits for a reply from the server. */
1251   _sendRequestForReply(record, data) {
1252     return Promise.resolve().then(_ => {
1253       // start the timer since we now have at least one request
1254       this._startRequestTimeoutTimer();
1256       let key = this._makePendingRequestKey(data);
1257       if (!this._pendingRequests.has(key)) {
1258         let request = {
1259           data,
1260           record,
1261           ctime: Date.now(),
1262         };
1263         request.promise = new Promise((resolve, reject) => {
1264           request.resolve = resolve;
1265           request.reject = reject;
1266         });
1267         this._pendingRequests.set(key, request);
1268         this._queueRequest(data);
1269       }
1271       return this._pendingRequests.get(key).promise;
1272     });
1273   },
1275   /** Removes and returns a pending request for a server reply. */
1276   _takeRequestForReply(reply) {
1277     if (typeof reply.channelID !== "string") {
1278       return null;
1279     }
1280     let key = this._makePendingRequestKey(reply);
1281     let request = this._pendingRequests.get(key);
1282     if (!request) {
1283       return null;
1284     }
1285     this._pendingRequests.delete(key);
1286     if (!this._hasPendingRequests()) {
1287       this._requestTimeoutTimer.cancel();
1288     }
1289     return request;
1290   },
1292   sendSubscribeBroadcast(serviceId, version) {
1293     this._currentlyRegistering.add(serviceId);
1294     let data = {
1295       messageType: "broadcast_subscribe",
1296       broadcasts: {
1297         [serviceId]: version,
1298       },
1299     };
1301     this._queueRequest(data);
1302   },
1305 function PushRecordWebSocket(record) {
1306   PushRecord.call(this, record);
1307   this.channelID = record.channelID;
1308   this.version = record.version;
1311 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1312   keyID: {
1313     get() {
1314       return this.channelID;
1315     },
1316   },
1319 PushRecordWebSocket.prototype.toSubscription = function() {
1320   let subscription = PushRecord.prototype.toSubscription.call(this);
1321   subscription.version = this.version;
1322   return subscription;