Bug 1881621 - Add colors/color_canvas.html tests to dom/canvas/test/reftest. r=bradwerth
[gecko.git] / dom / push / PushServiceWebSocket.sys.mjs
blob59e8f403f181aec6889f8dd62e6ca3654807bb67
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs";
6 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs";
7 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs";
9 const lazy = {};
11 ChromeUtils.defineESModuleGetters(lazy, {
12   ObjectUtils: "resource://gre/modules/ObjectUtils.sys.mjs",
13   pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs",
14 });
16 const kPUSHWSDB_DB_NAME = "pushapi";
17 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
18 const kPUSHWSDB_STORE_NAME = "pushapi";
20 // WebSocket close code sent by the server to indicate that the client should
21 // not automatically reconnect.
22 const kBACKOFF_WS_STATUS_CODE = 4774;
24 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
25 // included in request payloads.
26 const kACK_STATUS_TO_CODE = {
27   [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
28   [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
29   [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
32 const kUNREGISTER_REASON_TO_CODE = {
33   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
34   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
35   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
38 const kDELIVERY_REASON_TO_CODE = {
39   [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
40   [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
41   [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
44 const prefs = Services.prefs.getBranch("dom.push.");
46 ChromeUtils.defineLazyGetter(lazy, "console", () => {
47   let { ConsoleAPI } = ChromeUtils.importESModule(
48     "resource://gre/modules/Console.sys.mjs"
49   );
50   return new ConsoleAPI({
51     maxLogLevelPref: "dom.push.loglevel",
52     prefix: "PushServiceWebSocket",
53   });
54 });
56 /**
57  * A proxy between the PushService and the WebSocket. The listener is used so
58  * that the PushService can silence messages from the WebSocket by setting
59  * PushWebSocketListener._pushService to null. This is required because
60  * a WebSocket can continue to send messages or errors after it has been
61  * closed but the PushService may not be interested in these. It's easier to
62  * stop listening than to have checks at specific points.
63  */
64 var PushWebSocketListener = function (pushService) {
65   this._pushService = pushService;
68 PushWebSocketListener.prototype = {
69   onStart(context) {
70     if (!this._pushService) {
71       return;
72     }
73     this._pushService._wsOnStart(context);
74   },
76   onStop(context, statusCode) {
77     if (!this._pushService) {
78       return;
79     }
80     this._pushService._wsOnStop(context, statusCode);
81   },
83   onAcknowledge() {
84     // EMPTY
85   },
87   onBinaryMessageAvailable() {
88     // EMPTY
89   },
91   onMessageAvailable(context, message) {
92     if (!this._pushService) {
93       return;
94     }
95     this._pushService._wsOnMessageAvailable(context, message);
96   },
98   onServerClose(context, aStatusCode, aReason) {
99     if (!this._pushService) {
100       return;
101     }
102     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
103   },
106 // websocket states
107 // websocket is off
108 const STATE_SHUT_DOWN = 0;
109 // Websocket has been opened on client side, waiting for successful open.
110 // (_wsOnStart)
111 const STATE_WAITING_FOR_WS_START = 1;
112 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
113 const STATE_WAITING_FOR_HELLO = 2;
114 // Websocket operational, handshake completed, begin protocol messaging.
115 const STATE_READY = 3;
117 export var PushServiceWebSocket = {
118   QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]),
119   name: "PushServiceWebSocket",
121   _mainPushService: null,
122   _serverURI: null,
123   _currentlyRegistering: new Set(),
125   newPushDB() {
126     return new PushDB(
127       kPUSHWSDB_DB_NAME,
128       kPUSHWSDB_DB_VERSION,
129       kPUSHWSDB_STORE_NAME,
130       "channelID",
131       PushRecordWebSocket
132     );
133   },
135   disconnect() {
136     this._shutdownWS();
137   },
139   observe(aSubject, aTopic, aData) {
140     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
141       this._onUAIDChanged();
142     } else if (aTopic == "timer-callback") {
143       this._onTimerFired(aSubject);
144     }
145   },
147   /**
148    * Handles a UAID change. Unlike reconnects, we cancel all pending requests
149    * after disconnecting. Existing subscriptions stored in IndexedDB will be
150    * dropped on reconnect.
151    */
152   _onUAIDChanged() {
153     lazy.console.debug("onUAIDChanged()");
155     this._shutdownWS();
156     this._startBackoffTimer();
157   },
159   /** Handles a ping, backoff, or request timeout timer event. */
160   _onTimerFired(timer) {
161     lazy.console.debug("onTimerFired()");
163     if (timer == this._pingTimer) {
164       this._sendPing();
165       return;
166     }
168     if (timer == this._backoffTimer) {
169       lazy.console.debug("onTimerFired: Reconnecting after backoff");
170       this._beginWSSetup();
171       return;
172     }
174     if (timer == this._requestTimeoutTimer) {
175       this._timeOutRequests();
176     }
177   },
179   /**
180    * Sends a ping to the server. Bypasses the request queue, but starts the
181    * request timeout timer. If the socket is already closed, or the server
182    * does not respond within the timeout, the client will reconnect.
183    */
184   _sendPing() {
185     lazy.console.debug("sendPing()");
187     this._startRequestTimeoutTimer();
188     try {
189       this._wsSendMessage({});
190       this._lastPingTime = Date.now();
191     } catch (e) {
192       lazy.console.debug("sendPing: Error sending ping", e);
193       this._reconnect();
194     }
195   },
197   /** Times out any pending requests. */
198   _timeOutRequests() {
199     lazy.console.debug("timeOutRequests()");
201     if (!this._hasPendingRequests()) {
202       // Cancel the repeating timer and exit early if we aren't waiting for
203       // pongs or requests.
204       this._requestTimeoutTimer.cancel();
205       return;
206     }
208     let now = Date.now();
210     // Set to true if at least one request timed out, or we're still waiting
211     // for a pong after the request timeout.
212     let requestTimedOut = false;
214     if (
215       this._lastPingTime > 0 &&
216       now - this._lastPingTime > this._requestTimeout
217     ) {
218       lazy.console.debug("timeOutRequests: Did not receive pong in time");
219       requestTimedOut = true;
220     } else {
221       for (let [key, request] of this._pendingRequests) {
222         let duration = now - request.ctime;
223         // If any of the registration requests time out, all the ones after it
224         // also made to fail, since we are going to be disconnecting the
225         // socket.
226         requestTimedOut |= duration > this._requestTimeout;
227         if (requestTimedOut) {
228           request.reject(new Error("Request timed out: " + key));
229           this._pendingRequests.delete(key);
230         }
231       }
232     }
234     // The most likely reason for a pong or registration request timing out is
235     // that the socket has disconnected. Best to reconnect.
236     if (requestTimedOut) {
237       this._reconnect();
238     }
239   },
241   get _UAID() {
242     return prefs.getStringPref("userAgentID");
243   },
245   set _UAID(newID) {
246     if (typeof newID !== "string") {
247       lazy.console.warn(
248         "Got invalid, non-string UAID",
249         newID,
250         "Not updating userAgentID"
251       );
252       return;
253     }
254     lazy.console.debug("New _UAID", newID);
255     prefs.setStringPref("userAgentID", newID);
256   },
258   _ws: null,
259   _pendingRequests: new Map(),
260   _currentState: STATE_SHUT_DOWN,
261   _requestTimeout: 0,
262   _requestTimeoutTimer: null,
263   _retryFailCount: 0,
265   /**
266    * According to the WS spec, servers should immediately close the underlying
267    * TCP connection after they close a WebSocket. This causes wsOnStop to be
268    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
269    * WebSocket up, it should try to reconnect. But if the server closes the
270    * WebSocket because it wants the client to back off, then the client
271    * shouldn't re-establish the connection. If the server sends the backoff
272    * close code, this field will be set to true in wsOnServerClose. It is
273    * checked in wsOnStop.
274    */
275   _skipReconnect: false,
277   /** Indicates whether the server supports Web Push-style message delivery. */
278   _dataEnabled: false,
280   /**
281    * The last time the client sent a ping to the server. If non-zero, keeps the
282    * request timeout timer active. Reset to zero when the server responds with
283    * a pong or pending messages.
284    */
285   _lastPingTime: 0,
287   /**
288    * A one-shot timer used to ping the server, to avoid timing out idle
289    * connections. Reset to the ping interval on each incoming message.
290    */
291   _pingTimer: null,
293   /** A one-shot timer fired after the reconnect backoff period. */
294   _backoffTimer: null,
296   /**
297    * Sends a message to the Push Server through an open websocket.
298    * typeof(msg) shall be an object
299    */
300   _wsSendMessage(msg) {
301     if (!this._ws) {
302       lazy.console.warn(
303         "wsSendMessage: No WebSocket initialized.",
304         "Cannot send a message"
305       );
306       return;
307     }
308     msg = JSON.stringify(msg);
309     lazy.console.debug("wsSendMessage: Sending message", msg);
310     this._ws.sendMsg(msg);
311   },
313   init(options, mainPushService, serverURI) {
314     lazy.console.debug("init()");
316     this._mainPushService = mainPushService;
317     this._serverURI = serverURI;
318     // Filled in at connect() time
319     this._broadcastListeners = null;
321     // Override the default WebSocket factory function. The returned object
322     // must be null or satisfy the nsIWebSocketChannel interface. Used by
323     // the tests to provide a mock WebSocket implementation.
324     if (options.makeWebSocket) {
325       this._makeWebSocket = options.makeWebSocket;
326     }
328     this._requestTimeout = prefs.getIntPref("requestTimeout");
330     return Promise.resolve();
331   },
333   _reconnect() {
334     lazy.console.debug("reconnect()");
335     this._shutdownWS(false);
336     this._startBackoffTimer();
337   },
339   _shutdownWS(shouldCancelPending = true) {
340     lazy.console.debug("shutdownWS()");
342     if (this._currentState == STATE_READY) {
343       prefs.removeObserver("userAgentID", this);
344     }
346     this._currentState = STATE_SHUT_DOWN;
347     this._skipReconnect = false;
349     if (this._wsListener) {
350       this._wsListener._pushService = null;
351     }
352     try {
353       this._ws.close(0, null);
354     } catch (e) {}
355     this._ws = null;
357     this._lastPingTime = 0;
359     if (this._pingTimer) {
360       this._pingTimer.cancel();
361     }
363     if (shouldCancelPending) {
364       this._cancelPendingRequests();
365     }
367     if (this._notifyRequestQueue) {
368       this._notifyRequestQueue();
369       this._notifyRequestQueue = null;
370     }
371   },
373   uninit() {
374     // All pending requests (ideally none) are dropped at this point. We
375     // shouldn't have any applications performing registration/unregistration
376     // or receiving notifications.
377     this._shutdownWS();
379     if (this._backoffTimer) {
380       this._backoffTimer.cancel();
381     }
382     if (this._requestTimeoutTimer) {
383       this._requestTimeoutTimer.cancel();
384     }
386     this._mainPushService = null;
388     this._dataEnabled = false;
389   },
391   /**
392    * How retries work: If the WS is closed due to a socket error,
393    * _startBackoffTimer() is called. The retry timer is started and when
394    * it times out, beginWSSetup() is called again.
395    *
396    * If we are in the middle of a timeout (i.e. waiting), but
397    * a register/unregister is called, we don't want to wait around anymore.
398    * _sendRequest will automatically call beginWSSetup(), which will cancel the
399    * timer. In addition since the state will have changed, even if a pending
400    * timer event comes in (because the timer fired the event before it was
401    * cancelled), so the connection won't be reset.
402    */
403   _startBackoffTimer() {
404     lazy.console.debug("startBackoffTimer()");
406     // Calculate new timeout, but cap it to pingInterval.
407     let retryTimeout =
408       prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
409     retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
411     this._retryFailCount++;
413     lazy.console.debug(
414       "startBackoffTimer: Retry in",
415       retryTimeout,
416       "Try number",
417       this._retryFailCount
418     );
420     if (!this._backoffTimer) {
421       this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
422         Ci.nsITimer
423       );
424     }
425     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
426   },
428   /** Indicates whether we're waiting for pongs or requests. */
429   _hasPendingRequests() {
430     return this._lastPingTime > 0 || this._pendingRequests.size > 0;
431   },
433   /**
434    * Starts the request timeout timer unless we're already waiting for a pong
435    * or register request.
436    */
437   _startRequestTimeoutTimer() {
438     if (this._hasPendingRequests()) {
439       return;
440     }
441     if (!this._requestTimeoutTimer) {
442       this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
443         Ci.nsITimer
444       );
445     }
446     this._requestTimeoutTimer.init(
447       this,
448       this._requestTimeout,
449       Ci.nsITimer.TYPE_REPEATING_SLACK
450     );
451   },
453   /** Starts or resets the ping timer. */
454   _startPingTimer() {
455     if (!this._pingTimer) {
456       this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
457     }
458     this._pingTimer.init(
459       this,
460       prefs.getIntPref("pingInterval"),
461       Ci.nsITimer.TYPE_ONE_SHOT
462     );
463   },
465   _makeWebSocket(uri) {
466     if (!prefs.getBoolPref("connection.enabled")) {
467       lazy.console.warn(
468         "makeWebSocket: connection.enabled is not set to true.",
469         "Aborting."
470       );
471       return null;
472     }
473     if (Services.io.offline) {
474       lazy.console.warn("makeWebSocket: Network is offline.");
475       return null;
476     }
477     let contractId =
478       uri.scheme == "ws"
479         ? "@mozilla.org/network/protocol;1?name=ws"
480         : "@mozilla.org/network/protocol;1?name=wss";
481     let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
483     socket.initLoadInfo(
484       null, // aLoadingNode
485       Services.scriptSecurityManager.getSystemPrincipal(),
486       null, // aTriggeringPrincipal
487       Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
488       Ci.nsIContentPolicy.TYPE_WEBSOCKET
489     );
490     // Allow deprecated HTTP request from SystemPrincipal
491     socket.loadInfo.allowDeprecatedSystemRequests = true;
493     return socket;
494   },
496   _beginWSSetup() {
497     lazy.console.debug("beginWSSetup()");
498     if (this._currentState != STATE_SHUT_DOWN) {
499       lazy.console.error(
500         "_beginWSSetup: Not in shutdown state! Current state",
501         this._currentState
502       );
503       return;
504     }
506     // Stop any pending reconnects scheduled for the near future.
507     if (this._backoffTimer) {
508       this._backoffTimer.cancel();
509     }
511     let uri = this._serverURI;
512     if (!uri) {
513       return;
514     }
515     let socket = this._makeWebSocket(uri);
516     if (!socket) {
517       return;
518     }
519     this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
521     lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
522     this._wsListener = new PushWebSocketListener(this);
523     this._ws.protocol = "push-notification";
525     try {
526       // Grab a wakelock before we open the socket to ensure we don't go to
527       // sleep before connection the is opened.
528       this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
529       this._currentState = STATE_WAITING_FOR_WS_START;
530     } catch (e) {
531       lazy.console.error(
532         "beginWSSetup: Error opening websocket.",
533         "asyncOpen failed",
534         e
535       );
536       this._reconnect();
537     }
538   },
540   connect(broadcastListeners) {
541     lazy.console.debug("connect()", broadcastListeners);
542     this._broadcastListeners = broadcastListeners;
543     this._beginWSSetup();
544   },
546   isConnected() {
547     return !!this._ws;
548   },
550   /**
551    * Protocol handler invoked by server message.
552    */
553   _handleHelloReply(reply) {
554     lazy.console.debug("handleHelloReply()");
555     if (this._currentState != STATE_WAITING_FOR_HELLO) {
556       lazy.console.error(
557         "handleHelloReply: Unexpected state",
558         this._currentState,
559         "(expected STATE_WAITING_FOR_HELLO)"
560       );
561       this._shutdownWS();
562       return;
563     }
565     if (typeof reply.uaid !== "string") {
566       lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
567       this._shutdownWS();
568       return;
569     }
571     if (reply.uaid === "") {
572       lazy.console.error("handleHelloReply: Received empty UAID");
573       this._shutdownWS();
574       return;
575     }
577     // To avoid sticking extra large values sent by an evil server into prefs.
578     if (reply.uaid.length > 128) {
579       lazy.console.error(
580         "handleHelloReply: UAID received from server was too long",
581         reply.uaid
582       );
583       this._shutdownWS();
584       return;
585     }
587     let sendRequests = () => {
588       if (this._notifyRequestQueue) {
589         this._notifyRequestQueue();
590         this._notifyRequestQueue = null;
591       }
592       this._sendPendingRequests();
593     };
595     function finishHandshake() {
596       this._UAID = reply.uaid;
597       this._currentState = STATE_READY;
598       prefs.addObserver("userAgentID", this);
600       // Handle broadcasts received in response to the "hello" message.
601       if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
602         // The reply isn't technically a broadcast message, but it has
603         // the shape of a broadcast message (it has a broadcasts field).
604         const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
605         this._mainPushService.receivedBroadcastMessage(reply, context);
606       }
608       this._dataEnabled = !!reply.use_webpush;
609       if (this._dataEnabled) {
610         this._mainPushService
611           .getAllUnexpired()
612           .then(records =>
613             Promise.all(
614               records.map(record =>
615                 this._mainPushService.ensureCrypto(record).catch(error => {
616                   lazy.console.error(
617                     "finishHandshake: Error updating record",
618                     record.keyID,
619                     error
620                   );
621                 })
622               )
623             )
624           )
625           .then(sendRequests);
626       } else {
627         sendRequests();
628       }
629     }
631     // By this point we've got a UAID from the server that we are ready to
632     // accept.
633     //
634     // We unconditionally drop all existing registrations and notify service
635     // workers if we receive a new UAID. This ensures we expunge all stale
636     // registrations if the `userAgentID` pref is reset.
637     if (this._UAID != reply.uaid) {
638       lazy.console.debug("handleHelloReply: Received new UAID");
640       this._mainPushService
641         .dropUnexpiredRegistrations()
642         .then(finishHandshake.bind(this));
644       return;
645     }
647     // otherwise we are good to go
648     finishHandshake.bind(this)();
649   },
651   /**
652    * Protocol handler invoked by server message.
653    */
654   _handleRegisterReply(reply) {
655     lazy.console.debug("handleRegisterReply()");
657     let tmp = this._takeRequestForReply(reply);
658     if (!tmp) {
659       return;
660     }
662     if (reply.status == 200) {
663       try {
664         Services.io.newURI(reply.pushEndpoint);
665       } catch (e) {
666         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
667         return;
668       }
670       let record = new PushRecordWebSocket({
671         channelID: reply.channelID,
672         pushEndpoint: reply.pushEndpoint,
673         scope: tmp.record.scope,
674         originAttributes: tmp.record.originAttributes,
675         version: null,
676         systemRecord: tmp.record.systemRecord,
677         appServerKey: tmp.record.appServerKey,
678         ctime: Date.now(),
679       });
680       tmp.resolve(record);
681     } else {
682       lazy.console.error(
683         "handleRegisterReply: Unexpected server response",
684         reply
685       );
686       tmp.reject(
687         new Error("Wrong status code for register reply: " + reply.status)
688       );
689     }
690   },
692   _handleUnregisterReply(reply) {
693     lazy.console.debug("handleUnregisterReply()");
695     let request = this._takeRequestForReply(reply);
696     if (!request) {
697       return;
698     }
700     let success = reply.status === 200;
701     request.resolve(success);
702   },
704   _handleDataUpdate(update) {
705     let promise;
706     if (typeof update.channelID != "string") {
707       lazy.console.warn(
708         "handleDataUpdate: Discarding update without channel ID",
709         update
710       );
711       return;
712     }
713     function updateRecord(record) {
714       // Ignore messages that we've already processed. This can happen if the
715       // connection drops between notifying the service worker and acking the
716       // the message. In that case, the server will re-send the message on
717       // reconnect.
718       if (record.hasRecentMessageID(update.version)) {
719         lazy.console.warn(
720           "handleDataUpdate: Ignoring duplicate message",
721           update.version
722         );
723         return null;
724       }
725       record.noteRecentMessageID(update.version);
726       return record;
727     }
728     if (typeof update.data != "string") {
729       promise = this._mainPushService.receivedPushMessage(
730         update.channelID,
731         update.version,
732         null,
733         null,
734         updateRecord
735       );
736     } else {
737       let message = ChromeUtils.base64URLDecode(update.data, {
738         // The Push server may append padding.
739         padding: "ignore",
740       });
741       promise = this._mainPushService.receivedPushMessage(
742         update.channelID,
743         update.version,
744         update.headers,
745         message,
746         updateRecord
747       );
748     }
749     promise
750       .then(
751         status => {
752           this._sendAck(update.channelID, update.version, status);
753         },
754         err => {
755           lazy.console.error(
756             "handleDataUpdate: Error delivering message",
757             update,
758             err
759           );
760           this._sendAck(
761             update.channelID,
762             update.version,
763             Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
764           );
765         }
766       )
767       .catch(err => {
768         lazy.console.error(
769           "handleDataUpdate: Error acknowledging message",
770           update,
771           err
772         );
773       });
774   },
776   /**
777    * Protocol handler invoked by server message.
778    */
779   _handleNotificationReply(reply) {
780     lazy.console.debug("handleNotificationReply()");
781     if (this._dataEnabled) {
782       this._handleDataUpdate(reply);
783       return;
784     }
786     if (typeof reply.updates !== "object") {
787       lazy.console.warn(
788         "handleNotificationReply: Missing updates",
789         reply.updates
790       );
791       return;
792     }
794     lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
795     for (let i = 0; i < reply.updates.length; i++) {
796       let update = reply.updates[i];
797       lazy.console.debug("handleNotificationReply: Handling update", update);
798       if (typeof update.channelID !== "string") {
799         lazy.console.debug(
800           "handleNotificationReply: Invalid update at index",
801           i,
802           update
803         );
804         continue;
805       }
807       if (update.version === undefined) {
808         lazy.console.debug("handleNotificationReply: Missing version", update);
809         continue;
810       }
812       let version = update.version;
814       if (typeof version === "string") {
815         version = parseInt(version, 10);
816       }
818       if (typeof version === "number" && version >= 0) {
819         // FIXME(nsm): this relies on app update notification being infallible!
820         // eventually fix this
821         this._receivedUpdate(update.channelID, version);
822       }
823     }
824   },
826   _handleBroadcastReply(reply) {
827     let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
828     // Check if this reply is the result of registration.
829     for (const id of Object.keys(reply.broadcasts)) {
830       const wasRegistering = this._currentlyRegistering.delete(id);
831       if (wasRegistering) {
832         // If we get multiple broadcasts and only one is "registering",
833         // then we consider the phase to be REGISTER for all of them.
834         // It is acceptable since registrations do not happen so often,
835         // and are all very likely to occur soon after browser startup.
836         phase = lazy.pushBroadcastService.PHASES.REGISTER;
837       }
838     }
839     const context = { phase };
840     this._mainPushService.receivedBroadcastMessage(reply, context);
841   },
843   reportDeliveryError(messageID, reason) {
844     lazy.console.debug("reportDeliveryError()");
845     let code = kDELIVERY_REASON_TO_CODE[reason];
846     if (!code) {
847       throw new Error("Invalid delivery error reason");
848     }
849     let data = { messageType: "nack", version: messageID, code };
850     this._queueRequest(data);
851   },
853   _sendAck(channelID, version, status) {
854     lazy.console.debug("sendAck()");
855     let code = kACK_STATUS_TO_CODE[status];
856     if (!code) {
857       throw new Error("Invalid ack status");
858     }
859     let data = { messageType: "ack", updates: [{ channelID, version, code }] };
860     this._queueRequest(data);
861   },
863   _generateID() {
864     // generateUUID() gives a UUID surrounded by {...}, slice them off.
865     return Services.uuid.generateUUID().toString().slice(1, -1);
866   },
868   register(record) {
869     lazy.console.debug("register() ", record);
871     let data = { channelID: this._generateID(), messageType: "register" };
873     if (record.appServerKey) {
874       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
875         // The Push server requires padding.
876         pad: true,
877       });
878     }
880     return this._sendRequestForReply(record, data).then(record => {
881       if (!this._dataEnabled) {
882         return record;
883       }
884       return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
885         record.p256dhPublicKey = publicKey;
886         record.p256dhPrivateKey = privateKey;
887         record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
888         return record;
889       });
890     });
891   },
893   unregister(record, reason) {
894     lazy.console.debug("unregister() ", record, reason);
896     return Promise.resolve().then(_ => {
897       let code = kUNREGISTER_REASON_TO_CODE[reason];
898       if (!code) {
899         throw new Error("Invalid unregister reason");
900       }
901       let data = {
902         channelID: record.channelID,
903         messageType: "unregister",
904         code,
905       };
907       return this._sendRequestForReply(record, data);
908     });
909   },
911   _queueStart: Promise.resolve(),
912   _notifyRequestQueue: null,
913   _queue: null,
914   _enqueue(op) {
915     lazy.console.debug("enqueue()");
916     if (!this._queue) {
917       this._queue = this._queueStart;
918     }
919     this._queue = this._queue.then(op).catch(_ => {});
920   },
922   /** Sends a request to the server. */
923   _send(data) {
924     if (this._currentState != STATE_READY) {
925       lazy.console.warn(
926         "send: Unexpected state; ignoring message",
927         this._currentState
928       );
929       return;
930     }
931     if (!this._requestHasReply(data)) {
932       this._wsSendMessage(data);
933       return;
934     }
935     // If we're expecting a reply, check that we haven't cancelled the request.
936     let key = this._makePendingRequestKey(data);
937     if (!this._pendingRequests.has(key)) {
938       lazy.console.log("send: Request cancelled; ignoring message", key);
939       return;
940     }
941     this._wsSendMessage(data);
942   },
944   /** Indicates whether a request has a corresponding reply from the server. */
945   _requestHasReply(data) {
946     return data.messageType == "register" || data.messageType == "unregister";
947   },
949   /**
950    * Sends all pending requests that expect replies. Called after the connection
951    * is established and the handshake is complete.
952    */
953   _sendPendingRequests() {
954     this._enqueue(_ => {
955       for (let request of this._pendingRequests.values()) {
956         this._send(request.data);
957       }
958     });
959   },
961   /** Queues an outgoing request, establishing a connection if necessary. */
962   _queueRequest(data) {
963     lazy.console.debug("queueRequest()", data);
965     if (this._currentState == STATE_READY) {
966       // If we're ready, no need to queue; just send the request.
967       this._send(data);
968       return;
969     }
971     // Otherwise, we're still setting up. If we don't have a request queue,
972     // make one now.
973     if (!this._notifyRequestQueue) {
974       let promise = new Promise(resolve => {
975         this._notifyRequestQueue = resolve;
976       });
977       this._enqueue(_ => promise);
978     }
980     let isRequest = this._requestHasReply(data);
981     if (!isRequest) {
982       // Don't queue requests, since they're stored in `_pendingRequests`, and
983       // `_sendPendingRequests` will send them after reconnecting. Without this
984       // check, we'd send requests twice.
985       this._enqueue(_ => this._send(data));
986     }
988     if (!this._ws) {
989       // This will end up calling notifyRequestQueue().
990       this._beginWSSetup();
991       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
992       // not be call.
993       if (!this._ws && this._notifyRequestQueue) {
994         this._notifyRequestQueue();
995         this._notifyRequestQueue = null;
996       }
997     }
998   },
1000   _receivedUpdate(aChannelID, aLatestVersion) {
1001     lazy.console.debug(
1002       "receivedUpdate: Updating",
1003       aChannelID,
1004       "->",
1005       aLatestVersion
1006     );
1008     this._mainPushService
1009       .receivedPushMessage(aChannelID, "", null, null, record => {
1010         if (record.version === null || record.version < aLatestVersion) {
1011           lazy.console.debug(
1012             "receivedUpdate: Version changed for",
1013             aChannelID,
1014             aLatestVersion
1015           );
1016           record.version = aLatestVersion;
1017           return record;
1018         }
1019         lazy.console.debug(
1020           "receivedUpdate: No significant version change for",
1021           aChannelID,
1022           aLatestVersion
1023         );
1024         return null;
1025       })
1026       .then(status => {
1027         this._sendAck(aChannelID, aLatestVersion, status);
1028       })
1029       .catch(err => {
1030         lazy.console.error(
1031           "receivedUpdate: Error acknowledging message",
1032           aChannelID,
1033           aLatestVersion,
1034           err
1035         );
1036       });
1037   },
1039   // begin Push protocol handshake
1040   _wsOnStart() {
1041     lazy.console.debug("wsOnStart()");
1043     if (this._currentState != STATE_WAITING_FOR_WS_START) {
1044       lazy.console.error(
1045         "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1046         "state",
1047         this._currentState,
1048         "Skipping"
1049       );
1050       return;
1051     }
1053     this._mainPushService
1054       .getAllUnexpired()
1055       .then(
1056         records => this._sendHello(records),
1057         err => {
1058           lazy.console.warn(
1059             "Error fetching existing records before handshake; assuming none",
1060             err
1061           );
1062           this._sendHello([]);
1063         }
1064       )
1065       .catch(err => {
1066         // If we failed to send the handshake, back off and reconnect.
1067         lazy.console.warn("Failed to send handshake; reconnecting", err);
1068         this._reconnect();
1069       });
1070   },
1072   /**
1073    * Sends a `hello` handshake to the server.
1074    *
1075    * @param {Array<PushRecordWebSocket>} An array of records for existing
1076    *        subscriptions, used to determine whether to rotate our UAID.
1077    */
1078   _sendHello(records) {
1079     let data = {
1080       messageType: "hello",
1081       broadcasts: this._broadcastListeners,
1082       use_webpush: true,
1083     };
1085     if (records.length && this._UAID) {
1086       // Only send our UAID if we have existing push subscriptions, to
1087       // avoid tying a persistent identifier to the connection (bug
1088       // 1617136). The push server will issue our client a new UAID in
1089       // the `hello` response, which we'll store until either the next
1090       // time we reconnect, or the user subscribes to push. Once we have a
1091       // push subscription, we'll stop rotating the UAID when we connect,
1092       // so that we can receive push messages for them.
1093       data.uaid = this._UAID;
1094     }
1096     this._wsSendMessage(data);
1097     this._currentState = STATE_WAITING_FOR_HELLO;
1098   },
1100   /**
1101    * This statusCode is not the websocket protocol status code, but the TCP
1102    * connection close status code.
1103    *
1104    * If we do not explicitly call ws.close() then statusCode is always
1105    * NS_BASE_STREAM_CLOSED, even on a successful close.
1106    */
1107   _wsOnStop(context, statusCode) {
1108     lazy.console.debug("wsOnStop()");
1110     if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1111       lazy.console.debug(
1112         "wsOnStop: Reconnecting after socket error",
1113         statusCode
1114       );
1115       this._reconnect();
1116       return;
1117     }
1119     this._shutdownWS();
1120   },
1122   _wsOnMessageAvailable(context, message) {
1123     lazy.console.debug("wsOnMessageAvailable()", message);
1125     // Clearing the last ping time indicates we're no longer waiting for a pong.
1126     this._lastPingTime = 0;
1128     let reply;
1129     try {
1130       reply = JSON.parse(message);
1131     } catch (e) {
1132       lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1133       return;
1134     }
1136     // If we receive a message, we know the connection succeeded. Reset the
1137     // connection attempt and ping interval counters.
1138     this._retryFailCount = 0;
1140     let doNotHandle = false;
1141     if (
1142       message === "{}" ||
1143       reply.messageType === undefined ||
1144       reply.messageType === "ping" ||
1145       typeof reply.messageType != "string"
1146     ) {
1147       lazy.console.debug("wsOnMessageAvailable: Pong received");
1148       doNotHandle = true;
1149     }
1151     // Reset the ping timer.  Note: This path is executed at every step of the
1152     // handshake, so this timer does not need to be set explicitly at startup.
1153     this._startPingTimer();
1155     // If it is a ping, do not handle the message.
1156     if (doNotHandle) {
1157       if (!this._hasPendingRequests()) {
1158         this._requestTimeoutTimer.cancel();
1159       }
1160       return;
1161     }
1163     // An allowlist of protocol handlers. Add to these if new messages are added
1164     // in the protocol.
1165     let handlers = [
1166       "Hello",
1167       "Register",
1168       "Unregister",
1169       "Notification",
1170       "Broadcast",
1171     ];
1173     // Build up the handler name to call from messageType.
1174     // e.g. messageType == "register" -> _handleRegisterReply.
1175     let handlerName =
1176       reply.messageType[0].toUpperCase() +
1177       reply.messageType.slice(1).toLowerCase();
1179     if (!handlers.includes(handlerName)) {
1180       lazy.console.warn(
1181         "wsOnMessageAvailable: No allowlisted handler",
1182         handlerName,
1183         "for message",
1184         reply.messageType
1185       );
1186       return;
1187     }
1189     let handler = "_handle" + handlerName + "Reply";
1191     if (typeof this[handler] !== "function") {
1192       lazy.console.warn(
1193         "wsOnMessageAvailable: Handler",
1194         handler,
1195         "allowlisted but not implemented"
1196       );
1197       return;
1198     }
1200     this[handler](reply);
1201   },
1203   /**
1204    * The websocket should never be closed. Since we don't call ws.close(),
1205    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1206    * function), which calls reconnect and re-establishes the WebSocket
1207    * connection.
1208    *
1209    * If the server requested that we back off, we won't reconnect until the
1210    * next network state change event, or until we need to send a new register
1211    * request.
1212    */
1213   _wsOnServerClose(context, aStatusCode, aReason) {
1214     lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
1216     if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1217       lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
1218       this._skipReconnect = true;
1219     }
1220   },
1222   /**
1223    * Rejects all pending register requests with errors.
1224    */
1225   _cancelPendingRequests() {
1226     for (let request of this._pendingRequests.values()) {
1227       request.reject(new Error("Request aborted"));
1228     }
1229     this._pendingRequests.clear();
1230   },
1232   /** Creates a case-insensitive map key for a request that expects a reply. */
1233   _makePendingRequestKey(data) {
1234     return (data.messageType + "|" + data.channelID).toLowerCase();
1235   },
1237   /** Sends a request and waits for a reply from the server. */
1238   _sendRequestForReply(record, data) {
1239     return Promise.resolve().then(_ => {
1240       // start the timer since we now have at least one request
1241       this._startRequestTimeoutTimer();
1243       let key = this._makePendingRequestKey(data);
1244       if (!this._pendingRequests.has(key)) {
1245         let request = {
1246           data,
1247           record,
1248           ctime: Date.now(),
1249         };
1250         request.promise = new Promise((resolve, reject) => {
1251           request.resolve = resolve;
1252           request.reject = reject;
1253         });
1254         this._pendingRequests.set(key, request);
1255         this._queueRequest(data);
1256       }
1258       return this._pendingRequests.get(key).promise;
1259     });
1260   },
1262   /** Removes and returns a pending request for a server reply. */
1263   _takeRequestForReply(reply) {
1264     if (typeof reply.channelID !== "string") {
1265       return null;
1266     }
1267     let key = this._makePendingRequestKey(reply);
1268     let request = this._pendingRequests.get(key);
1269     if (!request) {
1270       return null;
1271     }
1272     this._pendingRequests.delete(key);
1273     if (!this._hasPendingRequests()) {
1274       this._requestTimeoutTimer.cancel();
1275     }
1276     return request;
1277   },
1279   sendSubscribeBroadcast(serviceId, version) {
1280     this._currentlyRegistering.add(serviceId);
1281     let data = {
1282       messageType: "broadcast_subscribe",
1283       broadcasts: {
1284         [serviceId]: version,
1285       },
1286     };
1288     this._queueRequest(data);
1289   },
1292 function PushRecordWebSocket(record) {
1293   PushRecord.call(this, record);
1294   this.channelID = record.channelID;
1295   this.version = record.version;
1298 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1299   keyID: {
1300     get() {
1301       return this.channelID;
1302     },
1303   },
1306 PushRecordWebSocket.prototype.toSubscription = function () {
1307   let subscription = PushRecord.prototype.toSubscription.call(this);
1308   subscription.version = this.version;
1309   return subscription;