Merge mozilla-central to autoland on a CLOSED TREE
[gecko.git] / dom / push / PushServiceWebSocket.sys.mjs
blob7792fc5fea8a210941423faf34dcda0b2b48f698
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
7 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs";
8 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs";
9 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs";
11 const lazy = {};
13 ChromeUtils.defineESModuleGetters(lazy, {
14   ObjectUtils: "resource://gre/modules/ObjectUtils.sys.mjs",
15   pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs",
16 });
18 const kPUSHWSDB_DB_NAME = "pushapi";
19 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
20 const kPUSHWSDB_STORE_NAME = "pushapi";
22 // WebSocket close code sent by the server to indicate that the client should
23 // not automatically reconnect.
24 const kBACKOFF_WS_STATUS_CODE = 4774;
26 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
27 // included in request payloads.
28 const kACK_STATUS_TO_CODE = {
29   [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
30   [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
31   [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
34 const kUNREGISTER_REASON_TO_CODE = {
35   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
36   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
37   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
40 const kDELIVERY_REASON_TO_CODE = {
41   [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
42   [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
43   [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
46 const prefs = Services.prefs.getBranch("dom.push.");
48 XPCOMUtils.defineLazyGetter(lazy, "console", () => {
49   let { ConsoleAPI } = ChromeUtils.importESModule(
50     "resource://gre/modules/Console.sys.mjs"
51   );
52   return new ConsoleAPI({
53     maxLogLevelPref: "dom.push.loglevel",
54     prefix: "PushServiceWebSocket",
55   });
56 });
58 /**
59  * A proxy between the PushService and the WebSocket. The listener is used so
60  * that the PushService can silence messages from the WebSocket by setting
61  * PushWebSocketListener._pushService to null. This is required because
62  * a WebSocket can continue to send messages or errors after it has been
63  * closed but the PushService may not be interested in these. It's easier to
64  * stop listening than to have checks at specific points.
65  */
66 var PushWebSocketListener = function (pushService) {
67   this._pushService = pushService;
70 PushWebSocketListener.prototype = {
71   onStart(context) {
72     if (!this._pushService) {
73       return;
74     }
75     this._pushService._wsOnStart(context);
76   },
78   onStop(context, statusCode) {
79     if (!this._pushService) {
80       return;
81     }
82     this._pushService._wsOnStop(context, statusCode);
83   },
85   onAcknowledge(context, size) {
86     // EMPTY
87   },
89   onBinaryMessageAvailable(context, message) {
90     // EMPTY
91   },
93   onMessageAvailable(context, message) {
94     if (!this._pushService) {
95       return;
96     }
97     this._pushService._wsOnMessageAvailable(context, message);
98   },
100   onServerClose(context, aStatusCode, aReason) {
101     if (!this._pushService) {
102       return;
103     }
104     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
105   },
108 // websocket states
109 // websocket is off
110 const STATE_SHUT_DOWN = 0;
111 // Websocket has been opened on client side, waiting for successful open.
112 // (_wsOnStart)
113 const STATE_WAITING_FOR_WS_START = 1;
114 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
115 const STATE_WAITING_FOR_HELLO = 2;
116 // Websocket operational, handshake completed, begin protocol messaging.
117 const STATE_READY = 3;
119 export var PushServiceWebSocket = {
120   QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]),
121   name: "PushServiceWebSocket",
123   _mainPushService: null,
124   _serverURI: null,
125   _currentlyRegistering: new Set(),
127   newPushDB() {
128     return new PushDB(
129       kPUSHWSDB_DB_NAME,
130       kPUSHWSDB_DB_VERSION,
131       kPUSHWSDB_STORE_NAME,
132       "channelID",
133       PushRecordWebSocket
134     );
135   },
137   disconnect() {
138     this._shutdownWS();
139   },
141   observe(aSubject, aTopic, aData) {
142     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
143       this._onUAIDChanged();
144     } else if (aTopic == "timer-callback") {
145       this._onTimerFired(aSubject);
146     }
147   },
149   /**
150    * Handles a UAID change. Unlike reconnects, we cancel all pending requests
151    * after disconnecting. Existing subscriptions stored in IndexedDB will be
152    * dropped on reconnect.
153    */
154   _onUAIDChanged() {
155     lazy.console.debug("onUAIDChanged()");
157     this._shutdownWS();
158     this._startBackoffTimer();
159   },
161   /** Handles a ping, backoff, or request timeout timer event. */
162   _onTimerFired(timer) {
163     lazy.console.debug("onTimerFired()");
165     if (timer == this._pingTimer) {
166       this._sendPing();
167       return;
168     }
170     if (timer == this._backoffTimer) {
171       lazy.console.debug("onTimerFired: Reconnecting after backoff");
172       this._beginWSSetup();
173       return;
174     }
176     if (timer == this._requestTimeoutTimer) {
177       this._timeOutRequests();
178     }
179   },
181   /**
182    * Sends a ping to the server. Bypasses the request queue, but starts the
183    * request timeout timer. If the socket is already closed, or the server
184    * does not respond within the timeout, the client will reconnect.
185    */
186   _sendPing() {
187     lazy.console.debug("sendPing()");
189     this._startRequestTimeoutTimer();
190     try {
191       this._wsSendMessage({});
192       this._lastPingTime = Date.now();
193     } catch (e) {
194       lazy.console.debug("sendPing: Error sending ping", e);
195       this._reconnect();
196     }
197   },
199   /** Times out any pending requests. */
200   _timeOutRequests() {
201     lazy.console.debug("timeOutRequests()");
203     if (!this._hasPendingRequests()) {
204       // Cancel the repeating timer and exit early if we aren't waiting for
205       // pongs or requests.
206       this._requestTimeoutTimer.cancel();
207       return;
208     }
210     let now = Date.now();
212     // Set to true if at least one request timed out, or we're still waiting
213     // for a pong after the request timeout.
214     let requestTimedOut = false;
216     if (
217       this._lastPingTime > 0 &&
218       now - this._lastPingTime > this._requestTimeout
219     ) {
220       lazy.console.debug("timeOutRequests: Did not receive pong in time");
221       requestTimedOut = true;
222     } else {
223       for (let [key, request] of this._pendingRequests) {
224         let duration = now - request.ctime;
225         // If any of the registration requests time out, all the ones after it
226         // also made to fail, since we are going to be disconnecting the
227         // socket.
228         requestTimedOut |= duration > this._requestTimeout;
229         if (requestTimedOut) {
230           request.reject(new Error("Request timed out: " + key));
231           this._pendingRequests.delete(key);
232         }
233       }
234     }
236     // The most likely reason for a pong or registration request timing out is
237     // that the socket has disconnected. Best to reconnect.
238     if (requestTimedOut) {
239       this._reconnect();
240     }
241   },
243   get _UAID() {
244     return prefs.getStringPref("userAgentID");
245   },
247   set _UAID(newID) {
248     if (typeof newID !== "string") {
249       lazy.console.warn(
250         "Got invalid, non-string UAID",
251         newID,
252         "Not updating userAgentID"
253       );
254       return;
255     }
256     lazy.console.debug("New _UAID", newID);
257     prefs.setStringPref("userAgentID", newID);
258   },
260   _ws: null,
261   _pendingRequests: new Map(),
262   _currentState: STATE_SHUT_DOWN,
263   _requestTimeout: 0,
264   _requestTimeoutTimer: null,
265   _retryFailCount: 0,
267   /**
268    * According to the WS spec, servers should immediately close the underlying
269    * TCP connection after they close a WebSocket. This causes wsOnStop to be
270    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
271    * WebSocket up, it should try to reconnect. But if the server closes the
272    * WebSocket because it wants the client to back off, then the client
273    * shouldn't re-establish the connection. If the server sends the backoff
274    * close code, this field will be set to true in wsOnServerClose. It is
275    * checked in wsOnStop.
276    */
277   _skipReconnect: false,
279   /** Indicates whether the server supports Web Push-style message delivery. */
280   _dataEnabled: false,
282   /**
283    * The last time the client sent a ping to the server. If non-zero, keeps the
284    * request timeout timer active. Reset to zero when the server responds with
285    * a pong or pending messages.
286    */
287   _lastPingTime: 0,
289   /**
290    * A one-shot timer used to ping the server, to avoid timing out idle
291    * connections. Reset to the ping interval on each incoming message.
292    */
293   _pingTimer: null,
295   /** A one-shot timer fired after the reconnect backoff period. */
296   _backoffTimer: null,
298   /**
299    * Sends a message to the Push Server through an open websocket.
300    * typeof(msg) shall be an object
301    */
302   _wsSendMessage(msg) {
303     if (!this._ws) {
304       lazy.console.warn(
305         "wsSendMessage: No WebSocket initialized.",
306         "Cannot send a message"
307       );
308       return;
309     }
310     msg = JSON.stringify(msg);
311     lazy.console.debug("wsSendMessage: Sending message", msg);
312     this._ws.sendMsg(msg);
313   },
315   init(options, mainPushService, serverURI) {
316     lazy.console.debug("init()");
318     this._mainPushService = mainPushService;
319     this._serverURI = serverURI;
320     // Filled in at connect() time
321     this._broadcastListeners = null;
323     // Override the default WebSocket factory function. The returned object
324     // must be null or satisfy the nsIWebSocketChannel interface. Used by
325     // the tests to provide a mock WebSocket implementation.
326     if (options.makeWebSocket) {
327       this._makeWebSocket = options.makeWebSocket;
328     }
330     this._requestTimeout = prefs.getIntPref("requestTimeout");
332     return Promise.resolve();
333   },
335   _reconnect() {
336     lazy.console.debug("reconnect()");
337     this._shutdownWS(false);
338     this._startBackoffTimer();
339   },
341   _shutdownWS(shouldCancelPending = true) {
342     lazy.console.debug("shutdownWS()");
344     if (this._currentState == STATE_READY) {
345       prefs.removeObserver("userAgentID", this);
346     }
348     this._currentState = STATE_SHUT_DOWN;
349     this._skipReconnect = false;
351     if (this._wsListener) {
352       this._wsListener._pushService = null;
353     }
354     try {
355       this._ws.close(0, null);
356     } catch (e) {}
357     this._ws = null;
359     this._lastPingTime = 0;
361     if (this._pingTimer) {
362       this._pingTimer.cancel();
363     }
365     if (shouldCancelPending) {
366       this._cancelPendingRequests();
367     }
369     if (this._notifyRequestQueue) {
370       this._notifyRequestQueue();
371       this._notifyRequestQueue = null;
372     }
373   },
375   uninit() {
376     // All pending requests (ideally none) are dropped at this point. We
377     // shouldn't have any applications performing registration/unregistration
378     // or receiving notifications.
379     this._shutdownWS();
381     if (this._backoffTimer) {
382       this._backoffTimer.cancel();
383     }
384     if (this._requestTimeoutTimer) {
385       this._requestTimeoutTimer.cancel();
386     }
388     this._mainPushService = null;
390     this._dataEnabled = false;
391   },
393   /**
394    * How retries work: If the WS is closed due to a socket error,
395    * _startBackoffTimer() is called. The retry timer is started and when
396    * it times out, beginWSSetup() is called again.
397    *
398    * If we are in the middle of a timeout (i.e. waiting), but
399    * a register/unregister is called, we don't want to wait around anymore.
400    * _sendRequest will automatically call beginWSSetup(), which will cancel the
401    * timer. In addition since the state will have changed, even if a pending
402    * timer event comes in (because the timer fired the event before it was
403    * cancelled), so the connection won't be reset.
404    */
405   _startBackoffTimer() {
406     lazy.console.debug("startBackoffTimer()");
408     // Calculate new timeout, but cap it to pingInterval.
409     let retryTimeout =
410       prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
411     retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
413     this._retryFailCount++;
415     lazy.console.debug(
416       "startBackoffTimer: Retry in",
417       retryTimeout,
418       "Try number",
419       this._retryFailCount
420     );
422     if (!this._backoffTimer) {
423       this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
424         Ci.nsITimer
425       );
426     }
427     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
428   },
430   /** Indicates whether we're waiting for pongs or requests. */
431   _hasPendingRequests() {
432     return this._lastPingTime > 0 || this._pendingRequests.size > 0;
433   },
435   /**
436    * Starts the request timeout timer unless we're already waiting for a pong
437    * or register request.
438    */
439   _startRequestTimeoutTimer() {
440     if (this._hasPendingRequests()) {
441       return;
442     }
443     if (!this._requestTimeoutTimer) {
444       this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
445         Ci.nsITimer
446       );
447     }
448     this._requestTimeoutTimer.init(
449       this,
450       this._requestTimeout,
451       Ci.nsITimer.TYPE_REPEATING_SLACK
452     );
453   },
455   /** Starts or resets the ping timer. */
456   _startPingTimer() {
457     if (!this._pingTimer) {
458       this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
459     }
460     this._pingTimer.init(
461       this,
462       prefs.getIntPref("pingInterval"),
463       Ci.nsITimer.TYPE_ONE_SHOT
464     );
465   },
467   _makeWebSocket(uri) {
468     if (!prefs.getBoolPref("connection.enabled")) {
469       lazy.console.warn(
470         "makeWebSocket: connection.enabled is not set to true.",
471         "Aborting."
472       );
473       return null;
474     }
475     if (Services.io.offline) {
476       lazy.console.warn("makeWebSocket: Network is offline.");
477       return null;
478     }
479     let contractId =
480       uri.scheme == "ws"
481         ? "@mozilla.org/network/protocol;1?name=ws"
482         : "@mozilla.org/network/protocol;1?name=wss";
483     let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
485     socket.initLoadInfo(
486       null, // aLoadingNode
487       Services.scriptSecurityManager.getSystemPrincipal(),
488       null, // aTriggeringPrincipal
489       Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
490       Ci.nsIContentPolicy.TYPE_WEBSOCKET
491     );
492     // Allow deprecated HTTP request from SystemPrincipal
493     socket.loadInfo.allowDeprecatedSystemRequests = true;
495     return socket;
496   },
498   _beginWSSetup() {
499     lazy.console.debug("beginWSSetup()");
500     if (this._currentState != STATE_SHUT_DOWN) {
501       lazy.console.error(
502         "_beginWSSetup: Not in shutdown state! Current state",
503         this._currentState
504       );
505       return;
506     }
508     // Stop any pending reconnects scheduled for the near future.
509     if (this._backoffTimer) {
510       this._backoffTimer.cancel();
511     }
513     let uri = this._serverURI;
514     if (!uri) {
515       return;
516     }
517     let socket = this._makeWebSocket(uri);
518     if (!socket) {
519       return;
520     }
521     this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
523     lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
524     this._wsListener = new PushWebSocketListener(this);
525     this._ws.protocol = "push-notification";
527     try {
528       // Grab a wakelock before we open the socket to ensure we don't go to
529       // sleep before connection the is opened.
530       this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
531       this._currentState = STATE_WAITING_FOR_WS_START;
532     } catch (e) {
533       lazy.console.error(
534         "beginWSSetup: Error opening websocket.",
535         "asyncOpen failed",
536         e
537       );
538       this._reconnect();
539     }
540   },
542   connect(broadcastListeners) {
543     lazy.console.debug("connect()", broadcastListeners);
544     this._broadcastListeners = broadcastListeners;
545     this._beginWSSetup();
546   },
548   isConnected() {
549     return !!this._ws;
550   },
552   /**
553    * Protocol handler invoked by server message.
554    */
555   _handleHelloReply(reply) {
556     lazy.console.debug("handleHelloReply()");
557     if (this._currentState != STATE_WAITING_FOR_HELLO) {
558       lazy.console.error(
559         "handleHelloReply: Unexpected state",
560         this._currentState,
561         "(expected STATE_WAITING_FOR_HELLO)"
562       );
563       this._shutdownWS();
564       return;
565     }
567     if (typeof reply.uaid !== "string") {
568       lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
569       this._shutdownWS();
570       return;
571     }
573     if (reply.uaid === "") {
574       lazy.console.error("handleHelloReply: Received empty UAID");
575       this._shutdownWS();
576       return;
577     }
579     // To avoid sticking extra large values sent by an evil server into prefs.
580     if (reply.uaid.length > 128) {
581       lazy.console.error(
582         "handleHelloReply: UAID received from server was too long",
583         reply.uaid
584       );
585       this._shutdownWS();
586       return;
587     }
589     let sendRequests = () => {
590       if (this._notifyRequestQueue) {
591         this._notifyRequestQueue();
592         this._notifyRequestQueue = null;
593       }
594       this._sendPendingRequests();
595     };
597     function finishHandshake() {
598       this._UAID = reply.uaid;
599       this._currentState = STATE_READY;
600       prefs.addObserver("userAgentID", this);
602       // Handle broadcasts received in response to the "hello" message.
603       if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
604         // The reply isn't technically a broadcast message, but it has
605         // the shape of a broadcast message (it has a broadcasts field).
606         const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
607         this._mainPushService.receivedBroadcastMessage(reply, context);
608       }
610       this._dataEnabled = !!reply.use_webpush;
611       if (this._dataEnabled) {
612         this._mainPushService
613           .getAllUnexpired()
614           .then(records =>
615             Promise.all(
616               records.map(record =>
617                 this._mainPushService.ensureCrypto(record).catch(error => {
618                   lazy.console.error(
619                     "finishHandshake: Error updating record",
620                     record.keyID,
621                     error
622                   );
623                 })
624               )
625             )
626           )
627           .then(sendRequests);
628       } else {
629         sendRequests();
630       }
631     }
633     // By this point we've got a UAID from the server that we are ready to
634     // accept.
635     //
636     // We unconditionally drop all existing registrations and notify service
637     // workers if we receive a new UAID. This ensures we expunge all stale
638     // registrations if the `userAgentID` pref is reset.
639     if (this._UAID != reply.uaid) {
640       lazy.console.debug("handleHelloReply: Received new UAID");
642       this._mainPushService
643         .dropUnexpiredRegistrations()
644         .then(finishHandshake.bind(this));
646       return;
647     }
649     // otherwise we are good to go
650     finishHandshake.bind(this)();
651   },
653   /**
654    * Protocol handler invoked by server message.
655    */
656   _handleRegisterReply(reply) {
657     lazy.console.debug("handleRegisterReply()");
659     let tmp = this._takeRequestForReply(reply);
660     if (!tmp) {
661       return;
662     }
664     if (reply.status == 200) {
665       try {
666         Services.io.newURI(reply.pushEndpoint);
667       } catch (e) {
668         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
669         return;
670       }
672       let record = new PushRecordWebSocket({
673         channelID: reply.channelID,
674         pushEndpoint: reply.pushEndpoint,
675         scope: tmp.record.scope,
676         originAttributes: tmp.record.originAttributes,
677         version: null,
678         systemRecord: tmp.record.systemRecord,
679         appServerKey: tmp.record.appServerKey,
680         ctime: Date.now(),
681       });
682       tmp.resolve(record);
683     } else {
684       lazy.console.error(
685         "handleRegisterReply: Unexpected server response",
686         reply
687       );
688       tmp.reject(
689         new Error("Wrong status code for register reply: " + reply.status)
690       );
691     }
692   },
694   _handleUnregisterReply(reply) {
695     lazy.console.debug("handleUnregisterReply()");
697     let request = this._takeRequestForReply(reply);
698     if (!request) {
699       return;
700     }
702     let success = reply.status === 200;
703     request.resolve(success);
704   },
706   _handleDataUpdate(update) {
707     let promise;
708     if (typeof update.channelID != "string") {
709       lazy.console.warn(
710         "handleDataUpdate: Discarding update without channel ID",
711         update
712       );
713       return;
714     }
715     function updateRecord(record) {
716       // Ignore messages that we've already processed. This can happen if the
717       // connection drops between notifying the service worker and acking the
718       // the message. In that case, the server will re-send the message on
719       // reconnect.
720       if (record.hasRecentMessageID(update.version)) {
721         lazy.console.warn(
722           "handleDataUpdate: Ignoring duplicate message",
723           update.version
724         );
725         return null;
726       }
727       record.noteRecentMessageID(update.version);
728       return record;
729     }
730     if (typeof update.data != "string") {
731       promise = this._mainPushService.receivedPushMessage(
732         update.channelID,
733         update.version,
734         null,
735         null,
736         updateRecord
737       );
738     } else {
739       let message = ChromeUtils.base64URLDecode(update.data, {
740         // The Push server may append padding.
741         padding: "ignore",
742       });
743       promise = this._mainPushService.receivedPushMessage(
744         update.channelID,
745         update.version,
746         update.headers,
747         message,
748         updateRecord
749       );
750     }
751     promise
752       .then(
753         status => {
754           this._sendAck(update.channelID, update.version, status);
755         },
756         err => {
757           lazy.console.error(
758             "handleDataUpdate: Error delivering message",
759             update,
760             err
761           );
762           this._sendAck(
763             update.channelID,
764             update.version,
765             Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
766           );
767         }
768       )
769       .catch(err => {
770         lazy.console.error(
771           "handleDataUpdate: Error acknowledging message",
772           update,
773           err
774         );
775       });
776   },
778   /**
779    * Protocol handler invoked by server message.
780    */
781   _handleNotificationReply(reply) {
782     lazy.console.debug("handleNotificationReply()");
783     if (this._dataEnabled) {
784       this._handleDataUpdate(reply);
785       return;
786     }
788     if (typeof reply.updates !== "object") {
789       lazy.console.warn(
790         "handleNotificationReply: Missing updates",
791         reply.updates
792       );
793       return;
794     }
796     lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
797     for (let i = 0; i < reply.updates.length; i++) {
798       let update = reply.updates[i];
799       lazy.console.debug("handleNotificationReply: Handling update", update);
800       if (typeof update.channelID !== "string") {
801         lazy.console.debug(
802           "handleNotificationReply: Invalid update at index",
803           i,
804           update
805         );
806         continue;
807       }
809       if (update.version === undefined) {
810         lazy.console.debug("handleNotificationReply: Missing version", update);
811         continue;
812       }
814       let version = update.version;
816       if (typeof version === "string") {
817         version = parseInt(version, 10);
818       }
820       if (typeof version === "number" && version >= 0) {
821         // FIXME(nsm): this relies on app update notification being infallible!
822         // eventually fix this
823         this._receivedUpdate(update.channelID, version);
824       }
825     }
826   },
828   _handleBroadcastReply(reply) {
829     let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
830     // Check if this reply is the result of registration.
831     for (const id of Object.keys(reply.broadcasts)) {
832       const wasRegistering = this._currentlyRegistering.delete(id);
833       if (wasRegistering) {
834         // If we get multiple broadcasts and only one is "registering",
835         // then we consider the phase to be REGISTER for all of them.
836         // It is acceptable since registrations do not happen so often,
837         // and are all very likely to occur soon after browser startup.
838         phase = lazy.pushBroadcastService.PHASES.REGISTER;
839       }
840     }
841     const context = { phase };
842     this._mainPushService.receivedBroadcastMessage(reply, context);
843   },
845   reportDeliveryError(messageID, reason) {
846     lazy.console.debug("reportDeliveryError()");
847     let code = kDELIVERY_REASON_TO_CODE[reason];
848     if (!code) {
849       throw new Error("Invalid delivery error reason");
850     }
851     let data = { messageType: "nack", version: messageID, code };
852     this._queueRequest(data);
853   },
855   _sendAck(channelID, version, status) {
856     lazy.console.debug("sendAck()");
857     let code = kACK_STATUS_TO_CODE[status];
858     if (!code) {
859       throw new Error("Invalid ack status");
860     }
861     let data = { messageType: "ack", updates: [{ channelID, version, code }] };
862     this._queueRequest(data);
863   },
865   _generateID() {
866     // generateUUID() gives a UUID surrounded by {...}, slice them off.
867     return Services.uuid.generateUUID().toString().slice(1, -1);
868   },
870   register(record) {
871     lazy.console.debug("register() ", record);
873     let data = { channelID: this._generateID(), messageType: "register" };
875     if (record.appServerKey) {
876       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
877         // The Push server requires padding.
878         pad: true,
879       });
880     }
882     return this._sendRequestForReply(record, data).then(record => {
883       if (!this._dataEnabled) {
884         return record;
885       }
886       return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
887         record.p256dhPublicKey = publicKey;
888         record.p256dhPrivateKey = privateKey;
889         record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
890         return record;
891       });
892     });
893   },
895   unregister(record, reason) {
896     lazy.console.debug("unregister() ", record, reason);
898     return Promise.resolve().then(_ => {
899       let code = kUNREGISTER_REASON_TO_CODE[reason];
900       if (!code) {
901         throw new Error("Invalid unregister reason");
902       }
903       let data = {
904         channelID: record.channelID,
905         messageType: "unregister",
906         code,
907       };
909       return this._sendRequestForReply(record, data);
910     });
911   },
913   _queueStart: Promise.resolve(),
914   _notifyRequestQueue: null,
915   _queue: null,
916   _enqueue(op) {
917     lazy.console.debug("enqueue()");
918     if (!this._queue) {
919       this._queue = this._queueStart;
920     }
921     this._queue = this._queue.then(op).catch(_ => {});
922   },
924   /** Sends a request to the server. */
925   _send(data) {
926     if (this._currentState != STATE_READY) {
927       lazy.console.warn(
928         "send: Unexpected state; ignoring message",
929         this._currentState
930       );
931       return;
932     }
933     if (!this._requestHasReply(data)) {
934       this._wsSendMessage(data);
935       return;
936     }
937     // If we're expecting a reply, check that we haven't cancelled the request.
938     let key = this._makePendingRequestKey(data);
939     if (!this._pendingRequests.has(key)) {
940       lazy.console.log("send: Request cancelled; ignoring message", key);
941       return;
942     }
943     this._wsSendMessage(data);
944   },
946   /** Indicates whether a request has a corresponding reply from the server. */
947   _requestHasReply(data) {
948     return data.messageType == "register" || data.messageType == "unregister";
949   },
951   /**
952    * Sends all pending requests that expect replies. Called after the connection
953    * is established and the handshake is complete.
954    */
955   _sendPendingRequests() {
956     this._enqueue(_ => {
957       for (let request of this._pendingRequests.values()) {
958         this._send(request.data);
959       }
960     });
961   },
963   /** Queues an outgoing request, establishing a connection if necessary. */
964   _queueRequest(data) {
965     lazy.console.debug("queueRequest()", data);
967     if (this._currentState == STATE_READY) {
968       // If we're ready, no need to queue; just send the request.
969       this._send(data);
970       return;
971     }
973     // Otherwise, we're still setting up. If we don't have a request queue,
974     // make one now.
975     if (!this._notifyRequestQueue) {
976       let promise = new Promise((resolve, reject) => {
977         this._notifyRequestQueue = resolve;
978       });
979       this._enqueue(_ => promise);
980     }
982     let isRequest = this._requestHasReply(data);
983     if (!isRequest) {
984       // Don't queue requests, since they're stored in `_pendingRequests`, and
985       // `_sendPendingRequests` will send them after reconnecting. Without this
986       // check, we'd send requests twice.
987       this._enqueue(_ => this._send(data));
988     }
990     if (!this._ws) {
991       // This will end up calling notifyRequestQueue().
992       this._beginWSSetup();
993       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
994       // not be call.
995       if (!this._ws && this._notifyRequestQueue) {
996         this._notifyRequestQueue();
997         this._notifyRequestQueue = null;
998       }
999     }
1000   },
1002   _receivedUpdate(aChannelID, aLatestVersion) {
1003     lazy.console.debug(
1004       "receivedUpdate: Updating",
1005       aChannelID,
1006       "->",
1007       aLatestVersion
1008     );
1010     this._mainPushService
1011       .receivedPushMessage(aChannelID, "", null, null, record => {
1012         if (record.version === null || record.version < aLatestVersion) {
1013           lazy.console.debug(
1014             "receivedUpdate: Version changed for",
1015             aChannelID,
1016             aLatestVersion
1017           );
1018           record.version = aLatestVersion;
1019           return record;
1020         }
1021         lazy.console.debug(
1022           "receivedUpdate: No significant version change for",
1023           aChannelID,
1024           aLatestVersion
1025         );
1026         return null;
1027       })
1028       .then(status => {
1029         this._sendAck(aChannelID, aLatestVersion, status);
1030       })
1031       .catch(err => {
1032         lazy.console.error(
1033           "receivedUpdate: Error acknowledging message",
1034           aChannelID,
1035           aLatestVersion,
1036           err
1037         );
1038       });
1039   },
1041   // begin Push protocol handshake
1042   _wsOnStart(context) {
1043     lazy.console.debug("wsOnStart()");
1045     if (this._currentState != STATE_WAITING_FOR_WS_START) {
1046       lazy.console.error(
1047         "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1048         "state",
1049         this._currentState,
1050         "Skipping"
1051       );
1052       return;
1053     }
1055     this._mainPushService
1056       .getAllUnexpired()
1057       .then(
1058         records => this._sendHello(records),
1059         err => {
1060           lazy.console.warn(
1061             "Error fetching existing records before handshake; assuming none",
1062             err
1063           );
1064           this._sendHello([]);
1065         }
1066       )
1067       .catch(err => {
1068         // If we failed to send the handshake, back off and reconnect.
1069         lazy.console.warn("Failed to send handshake; reconnecting", err);
1070         this._reconnect();
1071       });
1072   },
1074   /**
1075    * Sends a `hello` handshake to the server.
1076    *
1077    * @param {Array<PushRecordWebSocket>} An array of records for existing
1078    *        subscriptions, used to determine whether to rotate our UAID.
1079    */
1080   _sendHello(records) {
1081     let data = {
1082       messageType: "hello",
1083       broadcasts: this._broadcastListeners,
1084       use_webpush: true,
1085     };
1087     if (records.length && this._UAID) {
1088       // Only send our UAID if we have existing push subscriptions, to
1089       // avoid tying a persistent identifier to the connection (bug
1090       // 1617136). The push server will issue our client a new UAID in
1091       // the `hello` response, which we'll store until either the next
1092       // time we reconnect, or the user subscribes to push. Once we have a
1093       // push subscription, we'll stop rotating the UAID when we connect,
1094       // so that we can receive push messages for them.
1095       data.uaid = this._UAID;
1096     }
1098     this._wsSendMessage(data);
1099     this._currentState = STATE_WAITING_FOR_HELLO;
1100   },
1102   /**
1103    * This statusCode is not the websocket protocol status code, but the TCP
1104    * connection close status code.
1105    *
1106    * If we do not explicitly call ws.close() then statusCode is always
1107    * NS_BASE_STREAM_CLOSED, even on a successful close.
1108    */
1109   _wsOnStop(context, statusCode) {
1110     lazy.console.debug("wsOnStop()");
1112     if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1113       lazy.console.debug(
1114         "wsOnStop: Reconnecting after socket error",
1115         statusCode
1116       );
1117       this._reconnect();
1118       return;
1119     }
1121     this._shutdownWS();
1122   },
1124   _wsOnMessageAvailable(context, message) {
1125     lazy.console.debug("wsOnMessageAvailable()", message);
1127     // Clearing the last ping time indicates we're no longer waiting for a pong.
1128     this._lastPingTime = 0;
1130     let reply;
1131     try {
1132       reply = JSON.parse(message);
1133     } catch (e) {
1134       lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1135       return;
1136     }
1138     // If we receive a message, we know the connection succeeded. Reset the
1139     // connection attempt and ping interval counters.
1140     this._retryFailCount = 0;
1142     let doNotHandle = false;
1143     if (
1144       message === "{}" ||
1145       reply.messageType === undefined ||
1146       reply.messageType === "ping" ||
1147       typeof reply.messageType != "string"
1148     ) {
1149       lazy.console.debug("wsOnMessageAvailable: Pong received");
1150       doNotHandle = true;
1151     }
1153     // Reset the ping timer.  Note: This path is executed at every step of the
1154     // handshake, so this timer does not need to be set explicitly at startup.
1155     this._startPingTimer();
1157     // If it is a ping, do not handle the message.
1158     if (doNotHandle) {
1159       if (!this._hasPendingRequests()) {
1160         this._requestTimeoutTimer.cancel();
1161       }
1162       return;
1163     }
1165     // An allowlist of protocol handlers. Add to these if new messages are added
1166     // in the protocol.
1167     let handlers = [
1168       "Hello",
1169       "Register",
1170       "Unregister",
1171       "Notification",
1172       "Broadcast",
1173     ];
1175     // Build up the handler name to call from messageType.
1176     // e.g. messageType == "register" -> _handleRegisterReply.
1177     let handlerName =
1178       reply.messageType[0].toUpperCase() +
1179       reply.messageType.slice(1).toLowerCase();
1181     if (!handlers.includes(handlerName)) {
1182       lazy.console.warn(
1183         "wsOnMessageAvailable: No allowlisted handler",
1184         handlerName,
1185         "for message",
1186         reply.messageType
1187       );
1188       return;
1189     }
1191     let handler = "_handle" + handlerName + "Reply";
1193     if (typeof this[handler] !== "function") {
1194       lazy.console.warn(
1195         "wsOnMessageAvailable: Handler",
1196         handler,
1197         "allowlisted but not implemented"
1198       );
1199       return;
1200     }
1202     this[handler](reply);
1203   },
1205   /**
1206    * The websocket should never be closed. Since we don't call ws.close(),
1207    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1208    * function), which calls reconnect and re-establishes the WebSocket
1209    * connection.
1210    *
1211    * If the server requested that we back off, we won't reconnect until the
1212    * next network state change event, or until we need to send a new register
1213    * request.
1214    */
1215   _wsOnServerClose(context, aStatusCode, aReason) {
1216     lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
1218     if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1219       lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
1220       this._skipReconnect = true;
1221     }
1222   },
1224   /**
1225    * Rejects all pending register requests with errors.
1226    */
1227   _cancelPendingRequests() {
1228     for (let request of this._pendingRequests.values()) {
1229       request.reject(new Error("Request aborted"));
1230     }
1231     this._pendingRequests.clear();
1232   },
1234   /** Creates a case-insensitive map key for a request that expects a reply. */
1235   _makePendingRequestKey(data) {
1236     return (data.messageType + "|" + data.channelID).toLowerCase();
1237   },
1239   /** Sends a request and waits for a reply from the server. */
1240   _sendRequestForReply(record, data) {
1241     return Promise.resolve().then(_ => {
1242       // start the timer since we now have at least one request
1243       this._startRequestTimeoutTimer();
1245       let key = this._makePendingRequestKey(data);
1246       if (!this._pendingRequests.has(key)) {
1247         let request = {
1248           data,
1249           record,
1250           ctime: Date.now(),
1251         };
1252         request.promise = new Promise((resolve, reject) => {
1253           request.resolve = resolve;
1254           request.reject = reject;
1255         });
1256         this._pendingRequests.set(key, request);
1257         this._queueRequest(data);
1258       }
1260       return this._pendingRequests.get(key).promise;
1261     });
1262   },
1264   /** Removes and returns a pending request for a server reply. */
1265   _takeRequestForReply(reply) {
1266     if (typeof reply.channelID !== "string") {
1267       return null;
1268     }
1269     let key = this._makePendingRequestKey(reply);
1270     let request = this._pendingRequests.get(key);
1271     if (!request) {
1272       return null;
1273     }
1274     this._pendingRequests.delete(key);
1275     if (!this._hasPendingRequests()) {
1276       this._requestTimeoutTimer.cancel();
1277     }
1278     return request;
1279   },
1281   sendSubscribeBroadcast(serviceId, version) {
1282     this._currentlyRegistering.add(serviceId);
1283     let data = {
1284       messageType: "broadcast_subscribe",
1285       broadcasts: {
1286         [serviceId]: version,
1287       },
1288     };
1290     this._queueRequest(data);
1291   },
1294 function PushRecordWebSocket(record) {
1295   PushRecord.call(this, record);
1296   this.channelID = record.channelID;
1297   this.version = record.version;
1300 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1301   keyID: {
1302     get() {
1303       return this.channelID;
1304     },
1305   },
1308 PushRecordWebSocket.prototype.toSubscription = function () {
1309   let subscription = PushRecord.prototype.toSubscription.call(this);
1310   subscription.version = this.version;
1311   return subscription;