Bug 1733673 [wpt PR 31066] - Annotate CSS Transforms WPT reftests as fuzzy where...
[gecko.git] / dom / push / PushServiceWebSocket.jsm
blobe4f9e3e38e8f7daa1c75dc9c76bf61d15f5702b2
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3  * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 "use strict";
7 const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
8 const { XPCOMUtils } = ChromeUtils.import(
9   "resource://gre/modules/XPCOMUtils.jsm"
12 const { PushDB } = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
13 const { PushRecord } = ChromeUtils.import(
14   "resource://gre/modules/PushRecord.jsm"
16 const { PushCrypto } = ChromeUtils.import(
17   "resource://gre/modules/PushCrypto.jsm"
20 ChromeUtils.defineModuleGetter(
21   this,
22   "pushBroadcastService",
23   "resource://gre/modules/PushBroadcastService.jsm"
25 ChromeUtils.defineModuleGetter(
26   this,
27   "ObjectUtils",
28   "resource://gre/modules/ObjectUtils.jsm"
31 const kPUSHWSDB_DB_NAME = "pushapi";
32 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
33 const kPUSHWSDB_STORE_NAME = "pushapi";
35 // WebSocket close code sent by the server to indicate that the client should
36 // not automatically reconnect.
37 const kBACKOFF_WS_STATUS_CODE = 4774;
39 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
40 // included in request payloads.
41 const kACK_STATUS_TO_CODE = {
42   [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
43   [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
44   [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
47 const kUNREGISTER_REASON_TO_CODE = {
48   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
49   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
50   [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
53 const kDELIVERY_REASON_TO_CODE = {
54   [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
55   [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
56   [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
59 const prefs = Services.prefs.getBranch("dom.push.");
61 const EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
63 XPCOMUtils.defineLazyGetter(this, "console", () => {
64   let { ConsoleAPI } = ChromeUtils.import("resource://gre/modules/Console.jsm");
65   return new ConsoleAPI({
66     maxLogLevelPref: "dom.push.loglevel",
67     prefix: "PushServiceWebSocket",
68   });
69 });
71 /**
72  * A proxy between the PushService and the WebSocket. The listener is used so
73  * that the PushService can silence messages from the WebSocket by setting
74  * PushWebSocketListener._pushService to null. This is required because
75  * a WebSocket can continue to send messages or errors after it has been
76  * closed but the PushService may not be interested in these. It's easier to
77  * stop listening than to have checks at specific points.
78  */
79 var PushWebSocketListener = function(pushService) {
80   this._pushService = pushService;
83 PushWebSocketListener.prototype = {
84   onStart(context) {
85     if (!this._pushService) {
86       return;
87     }
88     this._pushService._wsOnStart(context);
89   },
91   onStop(context, statusCode) {
92     if (!this._pushService) {
93       return;
94     }
95     this._pushService._wsOnStop(context, statusCode);
96   },
98   onAcknowledge(context, size) {
99     // EMPTY
100   },
102   onBinaryMessageAvailable(context, message) {
103     // EMPTY
104   },
106   onMessageAvailable(context, message) {
107     if (!this._pushService) {
108       return;
109     }
110     this._pushService._wsOnMessageAvailable(context, message);
111   },
113   onServerClose(context, aStatusCode, aReason) {
114     if (!this._pushService) {
115       return;
116     }
117     this._pushService._wsOnServerClose(context, aStatusCode, aReason);
118   },
121 // websocket states
122 // websocket is off
123 const STATE_SHUT_DOWN = 0;
124 // Websocket has been opened on client side, waiting for successful open.
125 // (_wsOnStart)
126 const STATE_WAITING_FOR_WS_START = 1;
127 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
128 const STATE_WAITING_FOR_HELLO = 2;
129 // Websocket operational, handshake completed, begin protocol messaging.
130 const STATE_READY = 3;
132 var PushServiceWebSocket = {
133   _mainPushService: null,
134   _serverURI: null,
135   _currentlyRegistering: new Set(),
137   newPushDB() {
138     return new PushDB(
139       kPUSHWSDB_DB_NAME,
140       kPUSHWSDB_DB_VERSION,
141       kPUSHWSDB_STORE_NAME,
142       "channelID",
143       PushRecordWebSocket
144     );
145   },
147   disconnect() {
148     this._shutdownWS();
149   },
151   observe(aSubject, aTopic, aData) {
152     if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
153       this._onUAIDChanged();
154     } else if (aTopic == "timer-callback") {
155       this._onTimerFired(aSubject);
156     }
157   },
159   /**
160    * Handles a UAID change. Unlike reconnects, we cancel all pending requests
161    * after disconnecting. Existing subscriptions stored in IndexedDB will be
162    * dropped on reconnect.
163    */
164   _onUAIDChanged() {
165     console.debug("onUAIDChanged()");
167     this._shutdownWS();
168     this._startBackoffTimer();
169   },
171   /** Handles a ping, backoff, or request timeout timer event. */
172   _onTimerFired(timer) {
173     console.debug("onTimerFired()");
175     if (timer == this._pingTimer) {
176       this._sendPing();
177       return;
178     }
180     if (timer == this._backoffTimer) {
181       console.debug("onTimerFired: Reconnecting after backoff");
182       this._beginWSSetup();
183       return;
184     }
186     if (timer == this._requestTimeoutTimer) {
187       this._timeOutRequests();
188     }
189   },
191   /**
192    * Sends a ping to the server. Bypasses the request queue, but starts the
193    * request timeout timer. If the socket is already closed, or the server
194    * does not respond within the timeout, the client will reconnect.
195    */
196   _sendPing() {
197     console.debug("sendPing()");
199     this._startRequestTimeoutTimer();
200     try {
201       this._wsSendMessage({});
202       this._lastPingTime = Date.now();
203     } catch (e) {
204       console.debug("sendPing: Error sending ping", e);
205       this._reconnect();
206     }
207   },
209   /** Times out any pending requests. */
210   _timeOutRequests() {
211     console.debug("timeOutRequests()");
213     if (!this._hasPendingRequests()) {
214       // Cancel the repeating timer and exit early if we aren't waiting for
215       // pongs or requests.
216       this._requestTimeoutTimer.cancel();
217       return;
218     }
220     let now = Date.now();
222     // Set to true if at least one request timed out, or we're still waiting
223     // for a pong after the request timeout.
224     let requestTimedOut = false;
226     if (
227       this._lastPingTime > 0 &&
228       now - this._lastPingTime > this._requestTimeout
229     ) {
230       console.debug("timeOutRequests: Did not receive pong in time");
231       requestTimedOut = true;
232     } else {
233       for (let [key, request] of this._pendingRequests) {
234         let duration = now - request.ctime;
235         // If any of the registration requests time out, all the ones after it
236         // also made to fail, since we are going to be disconnecting the
237         // socket.
238         requestTimedOut |= duration > this._requestTimeout;
239         if (requestTimedOut) {
240           request.reject(new Error("Request timed out: " + key));
241           this._pendingRequests.delete(key);
242         }
243       }
244     }
246     // The most likely reason for a pong or registration request timing out is
247     // that the socket has disconnected. Best to reconnect.
248     if (requestTimedOut) {
249       this._reconnect();
250     }
251   },
253   get _UAID() {
254     return prefs.getStringPref("userAgentID");
255   },
257   set _UAID(newID) {
258     if (typeof newID !== "string") {
259       console.warn(
260         "Got invalid, non-string UAID",
261         newID,
262         "Not updating userAgentID"
263       );
264       return;
265     }
266     console.debug("New _UAID", newID);
267     prefs.setStringPref("userAgentID", newID);
268   },
270   _ws: null,
271   _pendingRequests: new Map(),
272   _currentState: STATE_SHUT_DOWN,
273   _requestTimeout: 0,
274   _requestTimeoutTimer: null,
275   _retryFailCount: 0,
277   /**
278    * According to the WS spec, servers should immediately close the underlying
279    * TCP connection after they close a WebSocket. This causes wsOnStop to be
280    * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
281    * WebSocket up, it should try to reconnect. But if the server closes the
282    * WebSocket because it wants the client to back off, then the client
283    * shouldn't re-establish the connection. If the server sends the backoff
284    * close code, this field will be set to true in wsOnServerClose. It is
285    * checked in wsOnStop.
286    */
287   _skipReconnect: false,
289   /** Indicates whether the server supports Web Push-style message delivery. */
290   _dataEnabled: false,
292   /**
293    * The last time the client sent a ping to the server. If non-zero, keeps the
294    * request timeout timer active. Reset to zero when the server responds with
295    * a pong or pending messages.
296    */
297   _lastPingTime: 0,
299   /**
300    * A one-shot timer used to ping the server, to avoid timing out idle
301    * connections. Reset to the ping interval on each incoming message.
302    */
303   _pingTimer: null,
305   /** A one-shot timer fired after the reconnect backoff period. */
306   _backoffTimer: null,
308   /**
309    * Sends a message to the Push Server through an open websocket.
310    * typeof(msg) shall be an object
311    */
312   _wsSendMessage(msg) {
313     if (!this._ws) {
314       console.warn(
315         "wsSendMessage: No WebSocket initialized.",
316         "Cannot send a message"
317       );
318       return;
319     }
320     msg = JSON.stringify(msg);
321     console.debug("wsSendMessage: Sending message", msg);
322     this._ws.sendMsg(msg);
323   },
325   init(options, mainPushService, serverURI) {
326     console.debug("init()");
328     this._mainPushService = mainPushService;
329     this._serverURI = serverURI;
330     // Filled in at connect() time
331     this._broadcastListeners = null;
333     // Override the default WebSocket factory function. The returned object
334     // must be null or satisfy the nsIWebSocketChannel interface. Used by
335     // the tests to provide a mock WebSocket implementation.
336     if (options.makeWebSocket) {
337       this._makeWebSocket = options.makeWebSocket;
338     }
340     this._requestTimeout = prefs.getIntPref("requestTimeout");
342     return Promise.resolve();
343   },
345   _reconnect() {
346     console.debug("reconnect()");
347     this._shutdownWS(false);
348     this._startBackoffTimer();
349   },
351   _shutdownWS(shouldCancelPending = true) {
352     console.debug("shutdownWS()");
354     if (this._currentState == STATE_READY) {
355       prefs.removeObserver("userAgentID", this);
356     }
358     this._currentState = STATE_SHUT_DOWN;
359     this._skipReconnect = false;
361     if (this._wsListener) {
362       this._wsListener._pushService = null;
363     }
364     try {
365       this._ws.close(0, null);
366     } catch (e) {}
367     this._ws = null;
369     this._lastPingTime = 0;
371     if (this._pingTimer) {
372       this._pingTimer.cancel();
373     }
375     if (shouldCancelPending) {
376       this._cancelPendingRequests();
377     }
379     if (this._notifyRequestQueue) {
380       this._notifyRequestQueue();
381       this._notifyRequestQueue = null;
382     }
383   },
385   uninit() {
386     // All pending requests (ideally none) are dropped at this point. We
387     // shouldn't have any applications performing registration/unregistration
388     // or receiving notifications.
389     this._shutdownWS();
391     if (this._backoffTimer) {
392       this._backoffTimer.cancel();
393     }
394     if (this._requestTimeoutTimer) {
395       this._requestTimeoutTimer.cancel();
396     }
398     this._mainPushService = null;
400     this._dataEnabled = false;
401   },
403   /**
404    * How retries work: If the WS is closed due to a socket error,
405    * _startBackoffTimer() is called. The retry timer is started and when
406    * it times out, beginWSSetup() is called again.
407    *
408    * If we are in the middle of a timeout (i.e. waiting), but
409    * a register/unregister is called, we don't want to wait around anymore.
410    * _sendRequest will automatically call beginWSSetup(), which will cancel the
411    * timer. In addition since the state will have changed, even if a pending
412    * timer event comes in (because the timer fired the event before it was
413    * cancelled), so the connection won't be reset.
414    */
415   _startBackoffTimer() {
416     console.debug("startBackoffTimer()");
418     // Calculate new timeout, but cap it to pingInterval.
419     let retryTimeout =
420       prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
421     retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
423     this._retryFailCount++;
425     console.debug(
426       "startBackoffTimer: Retry in",
427       retryTimeout,
428       "Try number",
429       this._retryFailCount
430     );
432     if (!this._backoffTimer) {
433       this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
434         Ci.nsITimer
435       );
436     }
437     this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
438   },
440   /** Indicates whether we're waiting for pongs or requests. */
441   _hasPendingRequests() {
442     return this._lastPingTime > 0 || this._pendingRequests.size > 0;
443   },
445   /**
446    * Starts the request timeout timer unless we're already waiting for a pong
447    * or register request.
448    */
449   _startRequestTimeoutTimer() {
450     if (this._hasPendingRequests()) {
451       return;
452     }
453     if (!this._requestTimeoutTimer) {
454       this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
455         Ci.nsITimer
456       );
457     }
458     this._requestTimeoutTimer.init(
459       this,
460       this._requestTimeout,
461       Ci.nsITimer.TYPE_REPEATING_SLACK
462     );
463   },
465   /** Starts or resets the ping timer. */
466   _startPingTimer() {
467     if (!this._pingTimer) {
468       this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
469     }
470     this._pingTimer.init(
471       this,
472       prefs.getIntPref("pingInterval"),
473       Ci.nsITimer.TYPE_ONE_SHOT
474     );
475   },
477   _makeWebSocket(uri) {
478     if (!prefs.getBoolPref("connection.enabled")) {
479       console.warn(
480         "makeWebSocket: connection.enabled is not set to true.",
481         "Aborting."
482       );
483       return null;
484     }
485     if (Services.io.offline) {
486       console.warn("makeWebSocket: Network is offline.");
487       return null;
488     }
489     let contractId =
490       uri.scheme == "ws"
491         ? "@mozilla.org/network/protocol;1?name=ws"
492         : "@mozilla.org/network/protocol;1?name=wss";
493     let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
495     socket.initLoadInfo(
496       null, // aLoadingNode
497       Services.scriptSecurityManager.getSystemPrincipal(),
498       null, // aTriggeringPrincipal
499       Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
500       Ci.nsIContentPolicy.TYPE_WEBSOCKET
501     );
502     // Allow deprecated HTTP request from SystemPrincipal
503     socket.loadInfo.allowDeprecatedSystemRequests = true;
505     return socket;
506   },
508   _beginWSSetup() {
509     console.debug("beginWSSetup()");
510     if (this._currentState != STATE_SHUT_DOWN) {
511       console.error(
512         "_beginWSSetup: Not in shutdown state! Current state",
513         this._currentState
514       );
515       return;
516     }
518     // Stop any pending reconnects scheduled for the near future.
519     if (this._backoffTimer) {
520       this._backoffTimer.cancel();
521     }
523     let uri = this._serverURI;
524     if (!uri) {
525       return;
526     }
527     let socket = this._makeWebSocket(uri);
528     if (!socket) {
529       return;
530     }
531     this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
533     console.debug("beginWSSetup: Connecting to", uri.spec);
534     this._wsListener = new PushWebSocketListener(this);
535     this._ws.protocol = "push-notification";
537     try {
538       // Grab a wakelock before we open the socket to ensure we don't go to
539       // sleep before connection the is opened.
540       this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
541       this._currentState = STATE_WAITING_FOR_WS_START;
542     } catch (e) {
543       console.error(
544         "beginWSSetup: Error opening websocket.",
545         "asyncOpen failed",
546         e
547       );
548       this._reconnect();
549     }
550   },
552   connect(broadcastListeners) {
553     console.debug("connect()", broadcastListeners);
554     this._broadcastListeners = broadcastListeners;
555     this._beginWSSetup();
556   },
558   isConnected() {
559     return !!this._ws;
560   },
562   /**
563    * Protocol handler invoked by server message.
564    */
565   _handleHelloReply(reply) {
566     console.debug("handleHelloReply()");
567     if (this._currentState != STATE_WAITING_FOR_HELLO) {
568       console.error(
569         "handleHelloReply: Unexpected state",
570         this._currentState,
571         "(expected STATE_WAITING_FOR_HELLO)"
572       );
573       this._shutdownWS();
574       return;
575     }
577     if (typeof reply.uaid !== "string") {
578       console.error("handleHelloReply: Received invalid UAID", reply.uaid);
579       this._shutdownWS();
580       return;
581     }
583     if (reply.uaid === "") {
584       console.error("handleHelloReply: Received empty UAID");
585       this._shutdownWS();
586       return;
587     }
589     // To avoid sticking extra large values sent by an evil server into prefs.
590     if (reply.uaid.length > 128) {
591       console.error(
592         "handleHelloReply: UAID received from server was too long",
593         reply.uaid
594       );
595       this._shutdownWS();
596       return;
597     }
599     let sendRequests = () => {
600       if (this._notifyRequestQueue) {
601         this._notifyRequestQueue();
602         this._notifyRequestQueue = null;
603       }
604       this._sendPendingRequests();
605     };
607     function finishHandshake() {
608       this._UAID = reply.uaid;
609       this._currentState = STATE_READY;
610       prefs.addObserver("userAgentID", this);
612       // Handle broadcasts received in response to the "hello" message.
613       if (!ObjectUtils.isEmpty(reply.broadcasts)) {
614         // The reply isn't technically a broadcast message, but it has
615         // the shape of a broadcast message (it has a broadcasts field).
616         const context = { phase: pushBroadcastService.PHASES.HELLO };
617         this._mainPushService.receivedBroadcastMessage(reply, context);
618       }
620       this._dataEnabled = !!reply.use_webpush;
621       if (this._dataEnabled) {
622         this._mainPushService
623           .getAllUnexpired()
624           .then(records =>
625             Promise.all(
626               records.map(record =>
627                 this._mainPushService.ensureCrypto(record).catch(error => {
628                   console.error(
629                     "finishHandshake: Error updating record",
630                     record.keyID,
631                     error
632                   );
633                 })
634               )
635             )
636           )
637           .then(sendRequests);
638       } else {
639         sendRequests();
640       }
641     }
643     // By this point we've got a UAID from the server that we are ready to
644     // accept.
645     //
646     // We unconditionally drop all existing registrations and notify service
647     // workers if we receive a new UAID. This ensures we expunge all stale
648     // registrations if the `userAgentID` pref is reset.
649     if (this._UAID != reply.uaid) {
650       console.debug("handleHelloReply: Received new UAID");
652       this._mainPushService
653         .dropUnexpiredRegistrations()
654         .then(finishHandshake.bind(this));
656       return;
657     }
659     // otherwise we are good to go
660     finishHandshake.bind(this)();
661   },
663   /**
664    * Protocol handler invoked by server message.
665    */
666   _handleRegisterReply(reply) {
667     console.debug("handleRegisterReply()");
669     let tmp = this._takeRequestForReply(reply);
670     if (!tmp) {
671       return;
672     }
674     if (reply.status == 200) {
675       try {
676         Services.io.newURI(reply.pushEndpoint);
677       } catch (e) {
678         tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
679         return;
680       }
682       let record = new PushRecordWebSocket({
683         channelID: reply.channelID,
684         pushEndpoint: reply.pushEndpoint,
685         scope: tmp.record.scope,
686         originAttributes: tmp.record.originAttributes,
687         version: null,
688         systemRecord: tmp.record.systemRecord,
689         appServerKey: tmp.record.appServerKey,
690         ctime: Date.now(),
691       });
692       tmp.resolve(record);
693     } else {
694       console.error("handleRegisterReply: Unexpected server response", reply);
695       tmp.reject(
696         new Error("Wrong status code for register reply: " + reply.status)
697       );
698     }
699   },
701   _handleUnregisterReply(reply) {
702     console.debug("handleUnregisterReply()");
704     let request = this._takeRequestForReply(reply);
705     if (!request) {
706       return;
707     }
709     let success = reply.status === 200;
710     request.resolve(success);
711   },
713   _handleDataUpdate(update) {
714     let promise;
715     if (typeof update.channelID != "string") {
716       console.warn(
717         "handleDataUpdate: Discarding update without channel ID",
718         update
719       );
720       return;
721     }
722     function updateRecord(record) {
723       // Ignore messages that we've already processed. This can happen if the
724       // connection drops between notifying the service worker and acking the
725       // the message. In that case, the server will re-send the message on
726       // reconnect.
727       if (record.hasRecentMessageID(update.version)) {
728         console.warn(
729           "handleDataUpdate: Ignoring duplicate message",
730           update.version
731         );
732         return null;
733       }
734       record.noteRecentMessageID(update.version);
735       return record;
736     }
737     if (typeof update.data != "string") {
738       promise = this._mainPushService.receivedPushMessage(
739         update.channelID,
740         update.version,
741         null,
742         null,
743         updateRecord
744       );
745     } else {
746       let message = ChromeUtils.base64URLDecode(update.data, {
747         // The Push server may append padding.
748         padding: "ignore",
749       });
750       promise = this._mainPushService.receivedPushMessage(
751         update.channelID,
752         update.version,
753         update.headers,
754         message,
755         updateRecord
756       );
757     }
758     promise
759       .then(
760         status => {
761           this._sendAck(update.channelID, update.version, status);
762         },
763         err => {
764           console.error(
765             "handleDataUpdate: Error delivering message",
766             update,
767             err
768           );
769           this._sendAck(
770             update.channelID,
771             update.version,
772             Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
773           );
774         }
775       )
776       .catch(err => {
777         console.error(
778           "handleDataUpdate: Error acknowledging message",
779           update,
780           err
781         );
782       });
783   },
785   /**
786    * Protocol handler invoked by server message.
787    */
788   _handleNotificationReply(reply) {
789     console.debug("handleNotificationReply()");
790     if (this._dataEnabled) {
791       this._handleDataUpdate(reply);
792       return;
793     }
795     if (typeof reply.updates !== "object") {
796       console.warn("handleNotificationReply: Missing updates", reply.updates);
797       return;
798     }
800     console.debug("handleNotificationReply: Got updates", reply.updates);
801     for (let i = 0; i < reply.updates.length; i++) {
802       let update = reply.updates[i];
803       console.debug("handleNotificationReply: Handling update", update);
804       if (typeof update.channelID !== "string") {
805         console.debug(
806           "handleNotificationReply: Invalid update at index",
807           i,
808           update
809         );
810         continue;
811       }
813       if (update.version === undefined) {
814         console.debug("handleNotificationReply: Missing version", update);
815         continue;
816       }
818       let version = update.version;
820       if (typeof version === "string") {
821         version = parseInt(version, 10);
822       }
824       if (typeof version === "number" && version >= 0) {
825         // FIXME(nsm): this relies on app update notification being infallible!
826         // eventually fix this
827         this._receivedUpdate(update.channelID, version);
828       }
829     }
830   },
832   _handleBroadcastReply(reply) {
833     let phase = pushBroadcastService.PHASES.BROADCAST;
834     // Check if this reply is the result of registration.
835     for (const id of Object.keys(reply.broadcasts)) {
836       const wasRegistering = this._currentlyRegistering.delete(id);
837       if (wasRegistering) {
838         // If we get multiple broadcasts and only one is "registering",
839         // then we consider the phase to be REGISTER for all of them.
840         // It is acceptable since registrations do not happen so often,
841         // and are all very likely to occur soon after browser startup.
842         phase = pushBroadcastService.PHASES.REGISTER;
843       }
844     }
845     const context = { phase };
846     this._mainPushService.receivedBroadcastMessage(reply, context);
847   },
849   reportDeliveryError(messageID, reason) {
850     console.debug("reportDeliveryError()");
851     let code = kDELIVERY_REASON_TO_CODE[reason];
852     if (!code) {
853       throw new Error("Invalid delivery error reason");
854     }
855     let data = { messageType: "nack", version: messageID, code };
856     this._queueRequest(data);
857   },
859   _sendAck(channelID, version, status) {
860     console.debug("sendAck()");
861     let code = kACK_STATUS_TO_CODE[status];
862     if (!code) {
863       throw new Error("Invalid ack status");
864     }
865     let data = { messageType: "ack", updates: [{ channelID, version, code }] };
866     this._queueRequest(data);
867   },
869   _generateID() {
870     // generateUUID() gives a UUID surrounded by {...}, slice them off.
871     return Services.uuid
872       .generateUUID()
873       .toString()
874       .slice(1, -1);
875   },
877   register(record) {
878     console.debug("register() ", record);
880     let data = { channelID: this._generateID(), messageType: "register" };
882     if (record.appServerKey) {
883       data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
884         // The Push server requires padding.
885         pad: true,
886       });
887     }
889     return this._sendRequestForReply(record, data).then(record => {
890       if (!this._dataEnabled) {
891         return record;
892       }
893       return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
894         record.p256dhPublicKey = publicKey;
895         record.p256dhPrivateKey = privateKey;
896         record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
897         return record;
898       });
899     });
900   },
902   unregister(record, reason) {
903     console.debug("unregister() ", record, reason);
905     return Promise.resolve().then(_ => {
906       let code = kUNREGISTER_REASON_TO_CODE[reason];
907       if (!code) {
908         throw new Error("Invalid unregister reason");
909       }
910       let data = {
911         channelID: record.channelID,
912         messageType: "unregister",
913         code,
914       };
916       return this._sendRequestForReply(record, data);
917     });
918   },
920   _queueStart: Promise.resolve(),
921   _notifyRequestQueue: null,
922   _queue: null,
923   _enqueue(op) {
924     console.debug("enqueue()");
925     if (!this._queue) {
926       this._queue = this._queueStart;
927     }
928     this._queue = this._queue.then(op).catch(_ => {});
929   },
931   /** Sends a request to the server. */
932   _send(data) {
933     if (this._currentState != STATE_READY) {
934       console.warn(
935         "send: Unexpected state; ignoring message",
936         this._currentState
937       );
938       return;
939     }
940     if (!this._requestHasReply(data)) {
941       this._wsSendMessage(data);
942       return;
943     }
944     // If we're expecting a reply, check that we haven't cancelled the request.
945     let key = this._makePendingRequestKey(data);
946     if (!this._pendingRequests.has(key)) {
947       console.log("send: Request cancelled; ignoring message", key);
948       return;
949     }
950     this._wsSendMessage(data);
951   },
953   /** Indicates whether a request has a corresponding reply from the server. */
954   _requestHasReply(data) {
955     return data.messageType == "register" || data.messageType == "unregister";
956   },
958   /**
959    * Sends all pending requests that expect replies. Called after the connection
960    * is established and the handshake is complete.
961    */
962   _sendPendingRequests() {
963     this._enqueue(_ => {
964       for (let request of this._pendingRequests.values()) {
965         this._send(request.data);
966       }
967     });
968   },
970   /** Queues an outgoing request, establishing a connection if necessary. */
971   _queueRequest(data) {
972     console.debug("queueRequest()", data);
974     if (this._currentState == STATE_READY) {
975       // If we're ready, no need to queue; just send the request.
976       this._send(data);
977       return;
978     }
980     // Otherwise, we're still setting up. If we don't have a request queue,
981     // make one now.
982     if (!this._notifyRequestQueue) {
983       let promise = new Promise((resolve, reject) => {
984         this._notifyRequestQueue = resolve;
985       });
986       this._enqueue(_ => promise);
987     }
989     let isRequest = this._requestHasReply(data);
990     if (!isRequest) {
991       // Don't queue requests, since they're stored in `_pendingRequests`, and
992       // `_sendPendingRequests` will send them after reconnecting. Without this
993       // check, we'd send requests twice.
994       this._enqueue(_ => this._send(data));
995     }
997     if (!this._ws) {
998       // This will end up calling notifyRequestQueue().
999       this._beginWSSetup();
1000       // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
1001       // not be call.
1002       if (!this._ws && this._notifyRequestQueue) {
1003         this._notifyRequestQueue();
1004         this._notifyRequestQueue = null;
1005       }
1006     }
1007   },
1009   _receivedUpdate(aChannelID, aLatestVersion) {
1010     console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion);
1012     this._mainPushService
1013       .receivedPushMessage(aChannelID, "", null, null, record => {
1014         if (record.version === null || record.version < aLatestVersion) {
1015           console.debug(
1016             "receivedUpdate: Version changed for",
1017             aChannelID,
1018             aLatestVersion
1019           );
1020           record.version = aLatestVersion;
1021           return record;
1022         }
1023         console.debug(
1024           "receivedUpdate: No significant version change for",
1025           aChannelID,
1026           aLatestVersion
1027         );
1028         return null;
1029       })
1030       .then(status => {
1031         this._sendAck(aChannelID, aLatestVersion, status);
1032       })
1033       .catch(err => {
1034         console.error(
1035           "receivedUpdate: Error acknowledging message",
1036           aChannelID,
1037           aLatestVersion,
1038           err
1039         );
1040       });
1041   },
1043   // begin Push protocol handshake
1044   _wsOnStart(context) {
1045     console.debug("wsOnStart()");
1047     if (this._currentState != STATE_WAITING_FOR_WS_START) {
1048       console.error(
1049         "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1050         "state",
1051         this._currentState,
1052         "Skipping"
1053       );
1054       return;
1055     }
1057     this._mainPushService
1058       .getAllUnexpired()
1059       .then(
1060         records => this._sendHello(records),
1061         err => {
1062           console.warn(
1063             "Error fetching existing records before handshake; assuming none",
1064             err
1065           );
1066           this._sendHello([]);
1067         }
1068       )
1069       .catch(err => {
1070         // If we failed to send the handshake, back off and reconnect.
1071         console.warn("Failed to send handshake; reconnecting", err);
1072         this._reconnect();
1073       });
1074   },
1076   /**
1077    * Sends a `hello` handshake to the server.
1078    *
1079    * @param {Array<PushRecordWebSocket>} An array of records for existing
1080    *        subscriptions, used to determine whether to rotate our UAID.
1081    */
1082   _sendHello(records) {
1083     let data = {
1084       messageType: "hello",
1085       broadcasts: this._broadcastListeners,
1086       use_webpush: true,
1087     };
1089     if (records.length && this._UAID) {
1090       // Only send our UAID if we have existing push subscriptions, to
1091       // avoid tying a persistent identifier to the connection (bug
1092       // 1617136). The push server will issue our client a new UAID in
1093       // the `hello` response, which we'll store until either the next
1094       // time we reconnect, or the user subscribes to push. Once we have a
1095       // push subscription, we'll stop rotating the UAID when we connect,
1096       // so that we can receive push messages for them.
1097       data.uaid = this._UAID;
1098     }
1100     this._wsSendMessage(data);
1101     this._currentState = STATE_WAITING_FOR_HELLO;
1102   },
1104   /**
1105    * This statusCode is not the websocket protocol status code, but the TCP
1106    * connection close status code.
1107    *
1108    * If we do not explicitly call ws.close() then statusCode is always
1109    * NS_BASE_STREAM_CLOSED, even on a successful close.
1110    */
1111   _wsOnStop(context, statusCode) {
1112     console.debug("wsOnStop()");
1114     if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1115       console.debug("wsOnStop: Reconnecting after socket error", statusCode);
1116       this._reconnect();
1117       return;
1118     }
1120     this._shutdownWS();
1121   },
1123   _wsOnMessageAvailable(context, message) {
1124     console.debug("wsOnMessageAvailable()", message);
1126     // Clearing the last ping time indicates we're no longer waiting for a pong.
1127     this._lastPingTime = 0;
1129     let reply;
1130     try {
1131       reply = JSON.parse(message);
1132     } catch (e) {
1133       console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1134       return;
1135     }
1137     // If we receive a message, we know the connection succeeded. Reset the
1138     // connection attempt and ping interval counters.
1139     this._retryFailCount = 0;
1141     let doNotHandle = false;
1142     if (
1143       message === "{}" ||
1144       reply.messageType === undefined ||
1145       reply.messageType === "ping" ||
1146       typeof reply.messageType != "string"
1147     ) {
1148       console.debug("wsOnMessageAvailable: Pong received");
1149       doNotHandle = true;
1150     }
1152     // Reset the ping timer.  Note: This path is executed at every step of the
1153     // handshake, so this timer does not need to be set explicitly at startup.
1154     this._startPingTimer();
1156     // If it is a ping, do not handle the message.
1157     if (doNotHandle) {
1158       return;
1159     }
1161     // A whitelist of protocol handlers. Add to these if new messages are added
1162     // in the protocol.
1163     let handlers = [
1164       "Hello",
1165       "Register",
1166       "Unregister",
1167       "Notification",
1168       "Broadcast",
1169     ];
1171     // Build up the handler name to call from messageType.
1172     // e.g. messageType == "register" -> _handleRegisterReply.
1173     let handlerName =
1174       reply.messageType[0].toUpperCase() +
1175       reply.messageType.slice(1).toLowerCase();
1177     if (!handlers.includes(handlerName)) {
1178       console.warn(
1179         "wsOnMessageAvailable: No whitelisted handler",
1180         handlerName,
1181         "for message",
1182         reply.messageType
1183       );
1184       return;
1185     }
1187     let handler = "_handle" + handlerName + "Reply";
1189     if (typeof this[handler] !== "function") {
1190       console.warn(
1191         "wsOnMessageAvailable: Handler",
1192         handler,
1193         "whitelisted but not implemented"
1194       );
1195       return;
1196     }
1198     this[handler](reply);
1199   },
1201   /**
1202    * The websocket should never be closed. Since we don't call ws.close(),
1203    * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1204    * function), which calls reconnect and re-establishes the WebSocket
1205    * connection.
1206    *
1207    * If the server requested that we back off, we won't reconnect until the
1208    * next network state change event, or until we need to send a new register
1209    * request.
1210    */
1211   _wsOnServerClose(context, aStatusCode, aReason) {
1212     console.debug("wsOnServerClose()", aStatusCode, aReason);
1214     if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1215       console.debug("wsOnServerClose: Skipping automatic reconnect");
1216       this._skipReconnect = true;
1217     }
1218   },
1220   /**
1221    * Rejects all pending register requests with errors.
1222    */
1223   _cancelPendingRequests() {
1224     for (let request of this._pendingRequests.values()) {
1225       request.reject(new Error("Request aborted"));
1226     }
1227     this._pendingRequests.clear();
1228   },
1230   /** Creates a case-insensitive map key for a request that expects a reply. */
1231   _makePendingRequestKey(data) {
1232     return (data.messageType + "|" + data.channelID).toLowerCase();
1233   },
1235   /** Sends a request and waits for a reply from the server. */
1236   _sendRequestForReply(record, data) {
1237     return Promise.resolve().then(_ => {
1238       // start the timer since we now have at least one request
1239       this._startRequestTimeoutTimer();
1241       let key = this._makePendingRequestKey(data);
1242       if (!this._pendingRequests.has(key)) {
1243         let request = {
1244           data,
1245           record,
1246           ctime: Date.now(),
1247         };
1248         request.promise = new Promise((resolve, reject) => {
1249           request.resolve = resolve;
1250           request.reject = reject;
1251         });
1252         this._pendingRequests.set(key, request);
1253         this._queueRequest(data);
1254       }
1256       return this._pendingRequests.get(key).promise;
1257     });
1258   },
1260   /** Removes and returns a pending request for a server reply. */
1261   _takeRequestForReply(reply) {
1262     if (typeof reply.channelID !== "string") {
1263       return null;
1264     }
1265     let key = this._makePendingRequestKey(reply);
1266     let request = this._pendingRequests.get(key);
1267     if (!request) {
1268       return null;
1269     }
1270     this._pendingRequests.delete(key);
1271     if (!this._hasPendingRequests()) {
1272       this._requestTimeoutTimer.cancel();
1273     }
1274     return request;
1275   },
1277   sendSubscribeBroadcast(serviceId, version) {
1278     this._currentlyRegistering.add(serviceId);
1279     let data = {
1280       messageType: "broadcast_subscribe",
1281       broadcasts: {
1282         [serviceId]: version,
1283       },
1284     };
1286     this._queueRequest(data);
1287   },
1290 function PushRecordWebSocket(record) {
1291   PushRecord.call(this, record);
1292   this.channelID = record.channelID;
1293   this.version = record.version;
1296 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1297   keyID: {
1298     get() {
1299       return this.channelID;
1300     },
1301   },
1304 PushRecordWebSocket.prototype.toSubscription = function() {
1305   let subscription = PushRecord.prototype.toSubscription.call(this);
1306   subscription.version = this.version;
1307   return subscription;