Bug 1835710 - Cancel off-thread JIT compilation before changing nursery allocation...
[gecko.git] / dom / push / PushServiceWebSocket.sys.mjs
blob997976fa2995369a563c9098ebdd57c53b73a984
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
7 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs";
8 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs";
9 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs";
11 const lazy = {};
13 ChromeUtils.defineESModuleGetters(lazy, {
14   pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs",
15 });
16 ChromeUtils.defineModuleGetter(
17   lazy,
18   "ObjectUtils",
19   "resource://gre/modules/ObjectUtils.jsm"
22 const kPUSHWSDB_DB_NAME = "pushapi";
23 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
24 const kPUSHWSDB_STORE_NAME = "pushapi";
26 // WebSocket close code sent by the server to indicate that the client should
27 // not automatically reconnect.
28 const kBACKOFF_WS_STATUS_CODE = 4774;
30 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
31 // included in request payloads.
32 const kACK_STATUS_TO_CODE = {
33   [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
34   [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
35   [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
38 const kUNREGISTER_REASON_TO_CODE = {
39   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
40   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
41   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
44 const kDELIVERY_REASON_TO_CODE = {
45   [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
46   [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
47   [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
50 const prefs = Services.prefs.getBranch("dom.push.");
52 XPCOMUtils.defineLazyGetter(lazy, "console", () => {
53   let { ConsoleAPI } = ChromeUtils.importESModule(
54     "resource://gre/modules/Console.sys.mjs"
55   );
56   return new ConsoleAPI({
57     maxLogLevelPref: "dom.push.loglevel",
58     prefix: "PushServiceWebSocket",
59   });
60 });
62 /**
63  * A proxy between the PushService and the WebSocket. The listener is used so
64  * that the PushService can silence messages from the WebSocket by setting
65  * PushWebSocketListener._pushService to null. This is required because
66  * a WebSocket can continue to send messages or errors after it has been
67  * closed but the PushService may not be interested in these. It's easier to
68  * stop listening than to have checks at specific points.
69  */
70 var PushWebSocketListener = function (pushService) {
71   this._pushService = pushService;
74 PushWebSocketListener.prototype = {
75   onStart(context) {
76     if (!this._pushService) {
77       return;
78     }
79     this._pushService._wsOnStart(context);
80   },
82   onStop(context, statusCode) {
83     if (!this._pushService) {
84       return;
85     }
86     this._pushService._wsOnStop(context, statusCode);
87   },
89   onAcknowledge(context, size) {
90     // EMPTY
91   },
93   onBinaryMessageAvailable(context, message) {
94     // EMPTY
95   },
97   onMessageAvailable(context, message) {
98     if (!this._pushService) {
99       return;
100     }
101     this._pushService._wsOnMessageAvailable(context, message);
102   },
104   onServerClose(context, aStatusCode, aReason) {
105     if (!this._pushService) {
106       return;
107     }
108     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
109   },
112 // websocket states
113 // websocket is off
114 const STATE_SHUT_DOWN = 0;
115 // Websocket has been opened on client side, waiting for successful open.
116 // (_wsOnStart)
117 const STATE_WAITING_FOR_WS_START = 1;
118 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
119 const STATE_WAITING_FOR_HELLO = 2;
120 // Websocket operational, handshake completed, begin protocol messaging.
121 const STATE_READY = 3;
123 export var PushServiceWebSocket = {
124   QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]),
125   name: "PushServiceWebSocket",
127   _mainPushService: null,
128   _serverURI: null,
129   _currentlyRegistering: new Set(),
131   newPushDB() {
132     return new PushDB(
133       kPUSHWSDB_DB_NAME,
134       kPUSHWSDB_DB_VERSION,
135       kPUSHWSDB_STORE_NAME,
136       "channelID",
137       PushRecordWebSocket
138     );
139   },
141   disconnect() {
142     this._shutdownWS();
143   },
145   observe(aSubject, aTopic, aData) {
146     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
147       this._onUAIDChanged();
148     } else if (aTopic == "timer-callback") {
149       this._onTimerFired(aSubject);
150     }
151   },
153   /**
154    * Handles a UAID change. Unlike reconnects, we cancel all pending requests
155    * after disconnecting. Existing subscriptions stored in IndexedDB will be
156    * dropped on reconnect.
157    */
158   _onUAIDChanged() {
159     lazy.console.debug("onUAIDChanged()");
161     this._shutdownWS();
162     this._startBackoffTimer();
163   },
165   /** Handles a ping, backoff, or request timeout timer event. */
166   _onTimerFired(timer) {
167     lazy.console.debug("onTimerFired()");
169     if (timer == this._pingTimer) {
170       this._sendPing();
171       return;
172     }
174     if (timer == this._backoffTimer) {
175       lazy.console.debug("onTimerFired: Reconnecting after backoff");
176       this._beginWSSetup();
177       return;
178     }
180     if (timer == this._requestTimeoutTimer) {
181       this._timeOutRequests();
182     }
183   },
185   /**
186    * Sends a ping to the server. Bypasses the request queue, but starts the
187    * request timeout timer. If the socket is already closed, or the server
188    * does not respond within the timeout, the client will reconnect.
189    */
190   _sendPing() {
191     lazy.console.debug("sendPing()");
193     this._startRequestTimeoutTimer();
194     try {
195       this._wsSendMessage({});
196       this._lastPingTime = Date.now();
197     } catch (e) {
198       lazy.console.debug("sendPing: Error sending ping", e);
199       this._reconnect();
200     }
201   },
203   /** Times out any pending requests. */
204   _timeOutRequests() {
205     lazy.console.debug("timeOutRequests()");
207     if (!this._hasPendingRequests()) {
208       // Cancel the repeating timer and exit early if we aren't waiting for
209       // pongs or requests.
210       this._requestTimeoutTimer.cancel();
211       return;
212     }
214     let now = Date.now();
216     // Set to true if at least one request timed out, or we're still waiting
217     // for a pong after the request timeout.
218     let requestTimedOut = false;
220     if (
221       this._lastPingTime > 0 &&
222       now - this._lastPingTime > this._requestTimeout
223     ) {
224       lazy.console.debug("timeOutRequests: Did not receive pong in time");
225       requestTimedOut = true;
226     } else {
227       for (let [key, request] of this._pendingRequests) {
228         let duration = now - request.ctime;
229         // If any of the registration requests time out, all the ones after it
230         // also made to fail, since we are going to be disconnecting the
231         // socket.
232         requestTimedOut |= duration > this._requestTimeout;
233         if (requestTimedOut) {
234           request.reject(new Error("Request timed out: " + key));
235           this._pendingRequests.delete(key);
236         }
237       }
238     }
240     // The most likely reason for a pong or registration request timing out is
241     // that the socket has disconnected. Best to reconnect.
242     if (requestTimedOut) {
243       this._reconnect();
244     }
245   },
247   get _UAID() {
248     return prefs.getStringPref("userAgentID");
249   },
251   set _UAID(newID) {
252     if (typeof newID !== "string") {
253       lazy.console.warn(
254         "Got invalid, non-string UAID",
255         newID,
256         "Not updating userAgentID"
257       );
258       return;
259     }
260     lazy.console.debug("New _UAID", newID);
261     prefs.setStringPref("userAgentID", newID);
262   },
264   _ws: null,
265   _pendingRequests: new Map(),
266   _currentState: STATE_SHUT_DOWN,
267   _requestTimeout: 0,
268   _requestTimeoutTimer: null,
269   _retryFailCount: 0,
271   /**
272    * According to the WS spec, servers should immediately close the underlying
273    * TCP connection after they close a WebSocket. This causes wsOnStop to be
274    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
275    * WebSocket up, it should try to reconnect. But if the server closes the
276    * WebSocket because it wants the client to back off, then the client
277    * shouldn't re-establish the connection. If the server sends the backoff
278    * close code, this field will be set to true in wsOnServerClose. It is
279    * checked in wsOnStop.
280    */
281   _skipReconnect: false,
283   /** Indicates whether the server supports Web Push-style message delivery. */
284   _dataEnabled: false,
286   /**
287    * The last time the client sent a ping to the server. If non-zero, keeps the
288    * request timeout timer active. Reset to zero when the server responds with
289    * a pong or pending messages.
290    */
291   _lastPingTime: 0,
293   /**
294    * A one-shot timer used to ping the server, to avoid timing out idle
295    * connections. Reset to the ping interval on each incoming message.
296    */
297   _pingTimer: null,
299   /** A one-shot timer fired after the reconnect backoff period. */
300   _backoffTimer: null,
302   /**
303    * Sends a message to the Push Server through an open websocket.
304    * typeof(msg) shall be an object
305    */
306   _wsSendMessage(msg) {
307     if (!this._ws) {
308       lazy.console.warn(
309         "wsSendMessage: No WebSocket initialized.",
310         "Cannot send a message"
311       );
312       return;
313     }
314     msg = JSON.stringify(msg);
315     lazy.console.debug("wsSendMessage: Sending message", msg);
316     this._ws.sendMsg(msg);
317   },
319   init(options, mainPushService, serverURI) {
320     lazy.console.debug("init()");
322     this._mainPushService = mainPushService;
323     this._serverURI = serverURI;
324     // Filled in at connect() time
325     this._broadcastListeners = null;
327     // Override the default WebSocket factory function. The returned object
328     // must be null or satisfy the nsIWebSocketChannel interface. Used by
329     // the tests to provide a mock WebSocket implementation.
330     if (options.makeWebSocket) {
331       this._makeWebSocket = options.makeWebSocket;
332     }
334     this._requestTimeout = prefs.getIntPref("requestTimeout");
336     return Promise.resolve();
337   },
339   _reconnect() {
340     lazy.console.debug("reconnect()");
341     this._shutdownWS(false);
342     this._startBackoffTimer();
343   },
345   _shutdownWS(shouldCancelPending = true) {
346     lazy.console.debug("shutdownWS()");
348     if (this._currentState == STATE_READY) {
349       prefs.removeObserver("userAgentID", this);
350     }
352     this._currentState = STATE_SHUT_DOWN;
353     this._skipReconnect = false;
355     if (this._wsListener) {
356       this._wsListener._pushService = null;
357     }
358     try {
359       this._ws.close(0, null);
360     } catch (e) {}
361     this._ws = null;
363     this._lastPingTime = 0;
365     if (this._pingTimer) {
366       this._pingTimer.cancel();
367     }
369     if (shouldCancelPending) {
370       this._cancelPendingRequests();
371     }
373     if (this._notifyRequestQueue) {
374       this._notifyRequestQueue();
375       this._notifyRequestQueue = null;
376     }
377   },
379   uninit() {
380     // All pending requests (ideally none) are dropped at this point. We
381     // shouldn't have any applications performing registration/unregistration
382     // or receiving notifications.
383     this._shutdownWS();
385     if (this._backoffTimer) {
386       this._backoffTimer.cancel();
387     }
388     if (this._requestTimeoutTimer) {
389       this._requestTimeoutTimer.cancel();
390     }
392     this._mainPushService = null;
394     this._dataEnabled = false;
395   },
397   /**
398    * How retries work: If the WS is closed due to a socket error,
399    * _startBackoffTimer() is called. The retry timer is started and when
400    * it times out, beginWSSetup() is called again.
401    *
402    * If we are in the middle of a timeout (i.e. waiting), but
403    * a register/unregister is called, we don't want to wait around anymore.
404    * _sendRequest will automatically call beginWSSetup(), which will cancel the
405    * timer. In addition since the state will have changed, even if a pending
406    * timer event comes in (because the timer fired the event before it was
407    * cancelled), so the connection won't be reset.
408    */
409   _startBackoffTimer() {
410     lazy.console.debug("startBackoffTimer()");
412     // Calculate new timeout, but cap it to pingInterval.
413     let retryTimeout =
414       prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
415     retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
417     this._retryFailCount++;
419     lazy.console.debug(
420       "startBackoffTimer: Retry in",
421       retryTimeout,
422       "Try number",
423       this._retryFailCount
424     );
426     if (!this._backoffTimer) {
427       this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
428         Ci.nsITimer
429       );
430     }
431     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
432   },
434   /** Indicates whether we're waiting for pongs or requests. */
435   _hasPendingRequests() {
436     return this._lastPingTime > 0 || this._pendingRequests.size > 0;
437   },
439   /**
440    * Starts the request timeout timer unless we're already waiting for a pong
441    * or register request.
442    */
443   _startRequestTimeoutTimer() {
444     if (this._hasPendingRequests()) {
445       return;
446     }
447     if (!this._requestTimeoutTimer) {
448       this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
449         Ci.nsITimer
450       );
451     }
452     this._requestTimeoutTimer.init(
453       this,
454       this._requestTimeout,
455       Ci.nsITimer.TYPE_REPEATING_SLACK
456     );
457   },
459   /** Starts or resets the ping timer. */
460   _startPingTimer() {
461     if (!this._pingTimer) {
462       this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
463     }
464     this._pingTimer.init(
465       this,
466       prefs.getIntPref("pingInterval"),
467       Ci.nsITimer.TYPE_ONE_SHOT
468     );
469   },
471   _makeWebSocket(uri) {
472     if (!prefs.getBoolPref("connection.enabled")) {
473       lazy.console.warn(
474         "makeWebSocket: connection.enabled is not set to true.",
475         "Aborting."
476       );
477       return null;
478     }
479     if (Services.io.offline) {
480       lazy.console.warn("makeWebSocket: Network is offline.");
481       return null;
482     }
483     let contractId =
484       uri.scheme == "ws"
485         ? "@mozilla.org/network/protocol;1?name=ws"
486         : "@mozilla.org/network/protocol;1?name=wss";
487     let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
489     socket.initLoadInfo(
490       null, // aLoadingNode
491       Services.scriptSecurityManager.getSystemPrincipal(),
492       null, // aTriggeringPrincipal
493       Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
494       Ci.nsIContentPolicy.TYPE_WEBSOCKET
495     );
496     // Allow deprecated HTTP request from SystemPrincipal
497     socket.loadInfo.allowDeprecatedSystemRequests = true;
499     return socket;
500   },
502   _beginWSSetup() {
503     lazy.console.debug("beginWSSetup()");
504     if (this._currentState != STATE_SHUT_DOWN) {
505       lazy.console.error(
506         "_beginWSSetup: Not in shutdown state! Current state",
507         this._currentState
508       );
509       return;
510     }
512     // Stop any pending reconnects scheduled for the near future.
513     if (this._backoffTimer) {
514       this._backoffTimer.cancel();
515     }
517     let uri = this._serverURI;
518     if (!uri) {
519       return;
520     }
521     let socket = this._makeWebSocket(uri);
522     if (!socket) {
523       return;
524     }
525     this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
527     lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
528     this._wsListener = new PushWebSocketListener(this);
529     this._ws.protocol = "push-notification";
531     try {
532       // Grab a wakelock before we open the socket to ensure we don't go to
533       // sleep before connection the is opened.
534       this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
535       this._currentState = STATE_WAITING_FOR_WS_START;
536     } catch (e) {
537       lazy.console.error(
538         "beginWSSetup: Error opening websocket.",
539         "asyncOpen failed",
540         e
541       );
542       this._reconnect();
543     }
544   },
546   connect(broadcastListeners) {
547     lazy.console.debug("connect()", broadcastListeners);
548     this._broadcastListeners = broadcastListeners;
549     this._beginWSSetup();
550   },
552   isConnected() {
553     return !!this._ws;
554   },
556   /**
557    * Protocol handler invoked by server message.
558    */
559   _handleHelloReply(reply) {
560     lazy.console.debug("handleHelloReply()");
561     if (this._currentState != STATE_WAITING_FOR_HELLO) {
562       lazy.console.error(
563         "handleHelloReply: Unexpected state",
564         this._currentState,
565         "(expected STATE_WAITING_FOR_HELLO)"
566       );
567       this._shutdownWS();
568       return;
569     }
571     if (typeof reply.uaid !== "string") {
572       lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
573       this._shutdownWS();
574       return;
575     }
577     if (reply.uaid === "") {
578       lazy.console.error("handleHelloReply: Received empty UAID");
579       this._shutdownWS();
580       return;
581     }
583     // To avoid sticking extra large values sent by an evil server into prefs.
584     if (reply.uaid.length > 128) {
585       lazy.console.error(
586         "handleHelloReply: UAID received from server was too long",
587         reply.uaid
588       );
589       this._shutdownWS();
590       return;
591     }
593     let sendRequests = () => {
594       if (this._notifyRequestQueue) {
595         this._notifyRequestQueue();
596         this._notifyRequestQueue = null;
597       }
598       this._sendPendingRequests();
599     };
601     function finishHandshake() {
602       this._UAID = reply.uaid;
603       this._currentState = STATE_READY;
604       prefs.addObserver("userAgentID", this);
606       // Handle broadcasts received in response to the "hello" message.
607       if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
608         // The reply isn't technically a broadcast message, but it has
609         // the shape of a broadcast message (it has a broadcasts field).
610         const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
611         this._mainPushService.receivedBroadcastMessage(reply, context);
612       }
614       this._dataEnabled = !!reply.use_webpush;
615       if (this._dataEnabled) {
616         this._mainPushService
617           .getAllUnexpired()
618           .then(records =>
619             Promise.all(
620               records.map(record =>
621                 this._mainPushService.ensureCrypto(record).catch(error => {
622                   lazy.console.error(
623                     "finishHandshake: Error updating record",
624                     record.keyID,
625                     error
626                   );
627                 })
628               )
629             )
630           )
631           .then(sendRequests);
632       } else {
633         sendRequests();
634       }
635     }
637     // By this point we've got a UAID from the server that we are ready to
638     // accept.
639     //
640     // We unconditionally drop all existing registrations and notify service
641     // workers if we receive a new UAID. This ensures we expunge all stale
642     // registrations if the `userAgentID` pref is reset.
643     if (this._UAID != reply.uaid) {
644       lazy.console.debug("handleHelloReply: Received new UAID");
646       this._mainPushService
647         .dropUnexpiredRegistrations()
648         .then(finishHandshake.bind(this));
650       return;
651     }
653     // otherwise we are good to go
654     finishHandshake.bind(this)();
655   },
657   /**
658    * Protocol handler invoked by server message.
659    */
660   _handleRegisterReply(reply) {
661     lazy.console.debug("handleRegisterReply()");
663     let tmp = this._takeRequestForReply(reply);
664     if (!tmp) {
665       return;
666     }
668     if (reply.status == 200) {
669       try {
670         Services.io.newURI(reply.pushEndpoint);
671       } catch (e) {
672         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
673         return;
674       }
676       let record = new PushRecordWebSocket({
677         channelID: reply.channelID,
678         pushEndpoint: reply.pushEndpoint,
679         scope: tmp.record.scope,
680         originAttributes: tmp.record.originAttributes,
681         version: null,
682         systemRecord: tmp.record.systemRecord,
683         appServerKey: tmp.record.appServerKey,
684         ctime: Date.now(),
685       });
686       tmp.resolve(record);
687     } else {
688       lazy.console.error(
689         "handleRegisterReply: Unexpected server response",
690         reply
691       );
692       tmp.reject(
693         new Error("Wrong status code for register reply: " + reply.status)
694       );
695     }
696   },
698   _handleUnregisterReply(reply) {
699     lazy.console.debug("handleUnregisterReply()");
701     let request = this._takeRequestForReply(reply);
702     if (!request) {
703       return;
704     }
706     let success = reply.status === 200;
707     request.resolve(success);
708   },
710   _handleDataUpdate(update) {
711     let promise;
712     if (typeof update.channelID != "string") {
713       lazy.console.warn(
714         "handleDataUpdate: Discarding update without channel ID",
715         update
716       );
717       return;
718     }
719     function updateRecord(record) {
720       // Ignore messages that we've already processed. This can happen if the
721       // connection drops between notifying the service worker and acking the
722       // the message. In that case, the server will re-send the message on
723       // reconnect.
724       if (record.hasRecentMessageID(update.version)) {
725         lazy.console.warn(
726           "handleDataUpdate: Ignoring duplicate message",
727           update.version
728         );
729         return null;
730       }
731       record.noteRecentMessageID(update.version);
732       return record;
733     }
734     if (typeof update.data != "string") {
735       promise = this._mainPushService.receivedPushMessage(
736         update.channelID,
737         update.version,
738         null,
739         null,
740         updateRecord
741       );
742     } else {
743       let message = ChromeUtils.base64URLDecode(update.data, {
744         // The Push server may append padding.
745         padding: "ignore",
746       });
747       promise = this._mainPushService.receivedPushMessage(
748         update.channelID,
749         update.version,
750         update.headers,
751         message,
752         updateRecord
753       );
754     }
755     promise
756       .then(
757         status => {
758           this._sendAck(update.channelID, update.version, status);
759         },
760         err => {
761           lazy.console.error(
762             "handleDataUpdate: Error delivering message",
763             update,
764             err
765           );
766           this._sendAck(
767             update.channelID,
768             update.version,
769             Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
770           );
771         }
772       )
773       .catch(err => {
774         lazy.console.error(
775           "handleDataUpdate: Error acknowledging message",
776           update,
777           err
778         );
779       });
780   },
782   /**
783    * Protocol handler invoked by server message.
784    */
785   _handleNotificationReply(reply) {
786     lazy.console.debug("handleNotificationReply()");
787     if (this._dataEnabled) {
788       this._handleDataUpdate(reply);
789       return;
790     }
792     if (typeof reply.updates !== "object") {
793       lazy.console.warn(
794         "handleNotificationReply: Missing updates",
795         reply.updates
796       );
797       return;
798     }
800     lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
801     for (let i = 0; i < reply.updates.length; i++) {
802       let update = reply.updates[i];
803       lazy.console.debug("handleNotificationReply: Handling update", update);
804       if (typeof update.channelID !== "string") {
805         lazy.console.debug(
806           "handleNotificationReply: Invalid update at index",
807           i,
808           update
809         );
810         continue;
811       }
813       if (update.version === undefined) {
814         lazy.console.debug("handleNotificationReply: Missing version", update);
815         continue;
816       }
818       let version = update.version;
820       if (typeof version === "string") {
821         version = parseInt(version, 10);
822       }
824       if (typeof version === "number" && version >= 0) {
825         // FIXME(nsm): this relies on app update notification being infallible!
826         // eventually fix this
827         this._receivedUpdate(update.channelID, version);
828       }
829     }
830   },
832   _handleBroadcastReply(reply) {
833     let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
834     // Check if this reply is the result of registration.
835     for (const id of Object.keys(reply.broadcasts)) {
836       const wasRegistering = this._currentlyRegistering.delete(id);
837       if (wasRegistering) {
838         // If we get multiple broadcasts and only one is "registering",
839         // then we consider the phase to be REGISTER for all of them.
840         // It is acceptable since registrations do not happen so often,
841         // and are all very likely to occur soon after browser startup.
842         phase = lazy.pushBroadcastService.PHASES.REGISTER;
843       }
844     }
845     const context = { phase };
846     this._mainPushService.receivedBroadcastMessage(reply, context);
847   },
849   reportDeliveryError(messageID, reason) {
850     lazy.console.debug("reportDeliveryError()");
851     let code = kDELIVERY_REASON_TO_CODE[reason];
852     if (!code) {
853       throw new Error("Invalid delivery error reason");
854     }
855     let data = { messageType: "nack", version: messageID, code };
856     this._queueRequest(data);
857   },
859   _sendAck(channelID, version, status) {
860     lazy.console.debug("sendAck()");
861     let code = kACK_STATUS_TO_CODE[status];
862     if (!code) {
863       throw new Error("Invalid ack status");
864     }
865     let data = { messageType: "ack", updates: [{ channelID, version, code }] };
866     this._queueRequest(data);
867   },
869   _generateID() {
870     // generateUUID() gives a UUID surrounded by {...}, slice them off.
871     return Services.uuid.generateUUID().toString().slice(1, -1);
872   },
874   register(record) {
875     lazy.console.debug("register() ", record);
877     let data = { channelID: this._generateID(), messageType: "register" };
879     if (record.appServerKey) {
880       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
881         // The Push server requires padding.
882         pad: true,
883       });
884     }
886     return this._sendRequestForReply(record, data).then(record => {
887       if (!this._dataEnabled) {
888         return record;
889       }
890       return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
891         record.p256dhPublicKey = publicKey;
892         record.p256dhPrivateKey = privateKey;
893         record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
894         return record;
895       });
896     });
897   },
899   unregister(record, reason) {
900     lazy.console.debug("unregister() ", record, reason);
902     return Promise.resolve().then(_ => {
903       let code = kUNREGISTER_REASON_TO_CODE[reason];
904       if (!code) {
905         throw new Error("Invalid unregister reason");
906       }
907       let data = {
908         channelID: record.channelID,
909         messageType: "unregister",
910         code,
911       };
913       return this._sendRequestForReply(record, data);
914     });
915   },
917   _queueStart: Promise.resolve(),
918   _notifyRequestQueue: null,
919   _queue: null,
920   _enqueue(op) {
921     lazy.console.debug("enqueue()");
922     if (!this._queue) {
923       this._queue = this._queueStart;
924     }
925     this._queue = this._queue.then(op).catch(_ => {});
926   },
928   /** Sends a request to the server. */
929   _send(data) {
930     if (this._currentState != STATE_READY) {
931       lazy.console.warn(
932         "send: Unexpected state; ignoring message",
933         this._currentState
934       );
935       return;
936     }
937     if (!this._requestHasReply(data)) {
938       this._wsSendMessage(data);
939       return;
940     }
941     // If we're expecting a reply, check that we haven't cancelled the request.
942     let key = this._makePendingRequestKey(data);
943     if (!this._pendingRequests.has(key)) {
944       lazy.console.log("send: Request cancelled; ignoring message", key);
945       return;
946     }
947     this._wsSendMessage(data);
948   },
950   /** Indicates whether a request has a corresponding reply from the server. */
951   _requestHasReply(data) {
952     return data.messageType == "register" || data.messageType == "unregister";
953   },
955   /**
956    * Sends all pending requests that expect replies. Called after the connection
957    * is established and the handshake is complete.
958    */
959   _sendPendingRequests() {
960     this._enqueue(_ => {
961       for (let request of this._pendingRequests.values()) {
962         this._send(request.data);
963       }
964     });
965   },
967   /** Queues an outgoing request, establishing a connection if necessary. */
968   _queueRequest(data) {
969     lazy.console.debug("queueRequest()", data);
971     if (this._currentState == STATE_READY) {
972       // If we're ready, no need to queue; just send the request.
973       this._send(data);
974       return;
975     }
977     // Otherwise, we're still setting up. If we don't have a request queue,
978     // make one now.
979     if (!this._notifyRequestQueue) {
980       let promise = new Promise((resolve, reject) => {
981         this._notifyRequestQueue = resolve;
982       });
983       this._enqueue(_ => promise);
984     }
986     let isRequest = this._requestHasReply(data);
987     if (!isRequest) {
988       // Don't queue requests, since they're stored in `_pendingRequests`, and
989       // `_sendPendingRequests` will send them after reconnecting. Without this
990       // check, we'd send requests twice.
991       this._enqueue(_ => this._send(data));
992     }
994     if (!this._ws) {
995       // This will end up calling notifyRequestQueue().
996       this._beginWSSetup();
997       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
998       // not be call.
999       if (!this._ws && this._notifyRequestQueue) {
1000         this._notifyRequestQueue();
1001         this._notifyRequestQueue = null;
1002       }
1003     }
1004   },
1006   _receivedUpdate(aChannelID, aLatestVersion) {
1007     lazy.console.debug(
1008       "receivedUpdate: Updating",
1009       aChannelID,
1010       "->",
1011       aLatestVersion
1012     );
1014     this._mainPushService
1015       .receivedPushMessage(aChannelID, "", null, null, record => {
1016         if (record.version === null || record.version < aLatestVersion) {
1017           lazy.console.debug(
1018             "receivedUpdate: Version changed for",
1019             aChannelID,
1020             aLatestVersion
1021           );
1022           record.version = aLatestVersion;
1023           return record;
1024         }
1025         lazy.console.debug(
1026           "receivedUpdate: No significant version change for",
1027           aChannelID,
1028           aLatestVersion
1029         );
1030         return null;
1031       })
1032       .then(status => {
1033         this._sendAck(aChannelID, aLatestVersion, status);
1034       })
1035       .catch(err => {
1036         lazy.console.error(
1037           "receivedUpdate: Error acknowledging message",
1038           aChannelID,
1039           aLatestVersion,
1040           err
1041         );
1042       });
1043   },
1045   // begin Push protocol handshake
1046   _wsOnStart(context) {
1047     lazy.console.debug("wsOnStart()");
1049     if (this._currentState != STATE_WAITING_FOR_WS_START) {
1050       lazy.console.error(
1051         "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1052         "state",
1053         this._currentState,
1054         "Skipping"
1055       );
1056       return;
1057     }
1059     this._mainPushService
1060       .getAllUnexpired()
1061       .then(
1062         records => this._sendHello(records),
1063         err => {
1064           lazy.console.warn(
1065             "Error fetching existing records before handshake; assuming none",
1066             err
1067           );
1068           this._sendHello([]);
1069         }
1070       )
1071       .catch(err => {
1072         // If we failed to send the handshake, back off and reconnect.
1073         lazy.console.warn("Failed to send handshake; reconnecting", err);
1074         this._reconnect();
1075       });
1076   },
1078   /**
1079    * Sends a `hello` handshake to the server.
1080    *
1081    * @param {Array<PushRecordWebSocket>} An array of records for existing
1082    *        subscriptions, used to determine whether to rotate our UAID.
1083    */
1084   _sendHello(records) {
1085     let data = {
1086       messageType: "hello",
1087       broadcasts: this._broadcastListeners,
1088       use_webpush: true,
1089     };
1091     if (records.length && this._UAID) {
1092       // Only send our UAID if we have existing push subscriptions, to
1093       // avoid tying a persistent identifier to the connection (bug
1094       // 1617136). The push server will issue our client a new UAID in
1095       // the `hello` response, which we'll store until either the next
1096       // time we reconnect, or the user subscribes to push. Once we have a
1097       // push subscription, we'll stop rotating the UAID when we connect,
1098       // so that we can receive push messages for them.
1099       data.uaid = this._UAID;
1100     }
1102     this._wsSendMessage(data);
1103     this._currentState = STATE_WAITING_FOR_HELLO;
1104   },
1106   /**
1107    * This statusCode is not the websocket protocol status code, but the TCP
1108    * connection close status code.
1109    *
1110    * If we do not explicitly call ws.close() then statusCode is always
1111    * NS_BASE_STREAM_CLOSED, even on a successful close.
1112    */
1113   _wsOnStop(context, statusCode) {
1114     lazy.console.debug("wsOnStop()");
1116     if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1117       lazy.console.debug(
1118         "wsOnStop: Reconnecting after socket error",
1119         statusCode
1120       );
1121       this._reconnect();
1122       return;
1123     }
1125     this._shutdownWS();
1126   },
1128   _wsOnMessageAvailable(context, message) {
1129     lazy.console.debug("wsOnMessageAvailable()", message);
1131     // Clearing the last ping time indicates we're no longer waiting for a pong.
1132     this._lastPingTime = 0;
1134     let reply;
1135     try {
1136       reply = JSON.parse(message);
1137     } catch (e) {
1138       lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1139       return;
1140     }
1142     // If we receive a message, we know the connection succeeded. Reset the
1143     // connection attempt and ping interval counters.
1144     this._retryFailCount = 0;
1146     let doNotHandle = false;
1147     if (
1148       message === "{}" ||
1149       reply.messageType === undefined ||
1150       reply.messageType === "ping" ||
1151       typeof reply.messageType != "string"
1152     ) {
1153       lazy.console.debug("wsOnMessageAvailable: Pong received");
1154       doNotHandle = true;
1155     }
1157     // Reset the ping timer.  Note: This path is executed at every step of the
1158     // handshake, so this timer does not need to be set explicitly at startup.
1159     this._startPingTimer();
1161     // If it is a ping, do not handle the message.
1162     if (doNotHandle) {
1163       if (!this._hasPendingRequests()) {
1164         this._requestTimeoutTimer.cancel();
1165       }
1166       return;
1167     }
1169     // An allowlist of protocol handlers. Add to these if new messages are added
1170     // in the protocol.
1171     let handlers = [
1172       "Hello",
1173       "Register",
1174       "Unregister",
1175       "Notification",
1176       "Broadcast",
1177     ];
1179     // Build up the handler name to call from messageType.
1180     // e.g. messageType == "register" -> _handleRegisterReply.
1181     let handlerName =
1182       reply.messageType[0].toUpperCase() +
1183       reply.messageType.slice(1).toLowerCase();
1185     if (!handlers.includes(handlerName)) {
1186       lazy.console.warn(
1187         "wsOnMessageAvailable: No allowlisted handler",
1188         handlerName,
1189         "for message",
1190         reply.messageType
1191       );
1192       return;
1193     }
1195     let handler = "_handle" + handlerName + "Reply";
1197     if (typeof this[handler] !== "function") {
1198       lazy.console.warn(
1199         "wsOnMessageAvailable: Handler",
1200         handler,
1201         "allowlisted but not implemented"
1202       );
1203       return;
1204     }
1206     this[handler](reply);
1207   },
1209   /**
1210    * The websocket should never be closed. Since we don't call ws.close(),
1211    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1212    * function), which calls reconnect and re-establishes the WebSocket
1213    * connection.
1214    *
1215    * If the server requested that we back off, we won't reconnect until the
1216    * next network state change event, or until we need to send a new register
1217    * request.
1218    */
1219   _wsOnServerClose(context, aStatusCode, aReason) {
1220     lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
1222     if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1223       lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
1224       this._skipReconnect = true;
1225     }
1226   },
1228   /**
1229    * Rejects all pending register requests with errors.
1230    */
1231   _cancelPendingRequests() {
1232     for (let request of this._pendingRequests.values()) {
1233       request.reject(new Error("Request aborted"));
1234     }
1235     this._pendingRequests.clear();
1236   },
1238   /** Creates a case-insensitive map key for a request that expects a reply. */
1239   _makePendingRequestKey(data) {
1240     return (data.messageType + "|" + data.channelID).toLowerCase();
1241   },
1243   /** Sends a request and waits for a reply from the server. */
1244   _sendRequestForReply(record, data) {
1245     return Promise.resolve().then(_ => {
1246       // start the timer since we now have at least one request
1247       this._startRequestTimeoutTimer();
1249       let key = this._makePendingRequestKey(data);
1250       if (!this._pendingRequests.has(key)) {
1251         let request = {
1252           data,
1253           record,
1254           ctime: Date.now(),
1255         };
1256         request.promise = new Promise((resolve, reject) => {
1257           request.resolve = resolve;
1258           request.reject = reject;
1259         });
1260         this._pendingRequests.set(key, request);
1261         this._queueRequest(data);
1262       }
1264       return this._pendingRequests.get(key).promise;
1265     });
1266   },
1268   /** Removes and returns a pending request for a server reply. */
1269   _takeRequestForReply(reply) {
1270     if (typeof reply.channelID !== "string") {
1271       return null;
1272     }
1273     let key = this._makePendingRequestKey(reply);
1274     let request = this._pendingRequests.get(key);
1275     if (!request) {
1276       return null;
1277     }
1278     this._pendingRequests.delete(key);
1279     if (!this._hasPendingRequests()) {
1280       this._requestTimeoutTimer.cancel();
1281     }
1282     return request;
1283   },
1285   sendSubscribeBroadcast(serviceId, version) {
1286     this._currentlyRegistering.add(serviceId);
1287     let data = {
1288       messageType: "broadcast_subscribe",
1289       broadcasts: {
1290         [serviceId]: version,
1291       },
1292     };
1294     this._queueRequest(data);
1295   },
1298 function PushRecordWebSocket(record) {
1299   PushRecord.call(this, record);
1300   this.channelID = record.channelID;
1301   this.version = record.version;
1304 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1305   keyID: {
1306     get() {
1307       return this.channelID;
1308     },
1309   },
1312 PushRecordWebSocket.prototype.toSubscription = function () {
1313   let subscription = PushRecord.prototype.toSubscription.call(this);
1314   subscription.version = this.version;
1315   return subscription;