1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3 * You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { PushDB } from "resource://gre/modules/PushDB.sys.mjs";
6 import { PushRecord } from "resource://gre/modules/PushRecord.sys.mjs";
7 import { PushCrypto } from "resource://gre/modules/PushCrypto.sys.mjs";
11 ChromeUtils.defineESModuleGetters(lazy, {
12 ObjectUtils: "resource://gre/modules/ObjectUtils.sys.mjs",
13 pushBroadcastService: "resource://gre/modules/PushBroadcastService.sys.mjs",
16 const kPUSHWSDB_DB_NAME = "pushapi";
17 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
18 const kPUSHWSDB_STORE_NAME = "pushapi";
20 // WebSocket close code sent by the server to indicate that the client should
21 // not automatically reconnect.
22 const kBACKOFF_WS_STATUS_CODE = 4774;
24 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
25 // included in request payloads.
26 const kACK_STATUS_TO_CODE = {
27 [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
28 [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
29 [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
32 const kUNREGISTER_REASON_TO_CODE = {
33 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
34 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
35 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
38 const kDELIVERY_REASON_TO_CODE = {
39 [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
40 [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
41 [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
44 const prefs = Services.prefs.getBranch("dom.push.");
46 ChromeUtils.defineLazyGetter(lazy, "console", () => {
47 let { ConsoleAPI } = ChromeUtils.importESModule(
48 "resource://gre/modules/Console.sys.mjs"
50 return new ConsoleAPI({
51 maxLogLevelPref: "dom.push.loglevel",
52 prefix: "PushServiceWebSocket",
57 * A proxy between the PushService and the WebSocket. The listener is used so
58 * that the PushService can silence messages from the WebSocket by setting
59 * PushWebSocketListener._pushService to null. This is required because
60 * a WebSocket can continue to send messages or errors after it has been
61 * closed but the PushService may not be interested in these. It's easier to
62 * stop listening than to have checks at specific points.
64 var PushWebSocketListener = function (pushService) {
65 this._pushService = pushService;
68 PushWebSocketListener.prototype = {
70 if (!this._pushService) {
73 this._pushService._wsOnStart(context);
76 onStop(context, statusCode) {
77 if (!this._pushService) {
80 this._pushService._wsOnStop(context, statusCode);
87 onBinaryMessageAvailable() {
91 onMessageAvailable(context, message) {
92 if (!this._pushService) {
95 this._pushService._wsOnMessageAvailable(context, message);
98 onServerClose(context, aStatusCode, aReason) {
99 if (!this._pushService) {
102 this._pushService._wsOnServerClose(context, aStatusCode, aReason);
108 const STATE_SHUT_DOWN = 0;
109 // Websocket has been opened on client side, waiting for successful open.
111 const STATE_WAITING_FOR_WS_START = 1;
112 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
113 const STATE_WAITING_FOR_HELLO = 2;
114 // Websocket operational, handshake completed, begin protocol messaging.
115 const STATE_READY = 3;
117 export var PushServiceWebSocket = {
118 QueryInterface: ChromeUtils.generateQI(["nsINamed", "nsIObserver"]),
119 name: "PushServiceWebSocket",
121 _mainPushService: null,
123 _currentlyRegistering: new Set(),
128 kPUSHWSDB_DB_VERSION,
129 kPUSHWSDB_STORE_NAME,
139 observe(aSubject, aTopic, aData) {
140 if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
141 this._onUAIDChanged();
142 } else if (aTopic == "timer-callback") {
143 this._onTimerFired(aSubject);
148 * Handles a UAID change. Unlike reconnects, we cancel all pending requests
149 * after disconnecting. Existing subscriptions stored in IndexedDB will be
150 * dropped on reconnect.
153 lazy.console.debug("onUAIDChanged()");
156 this._startBackoffTimer();
159 /** Handles a ping, backoff, or request timeout timer event. */
160 _onTimerFired(timer) {
161 lazy.console.debug("onTimerFired()");
163 if (timer == this._pingTimer) {
168 if (timer == this._backoffTimer) {
169 lazy.console.debug("onTimerFired: Reconnecting after backoff");
170 this._beginWSSetup();
174 if (timer == this._requestTimeoutTimer) {
175 this._timeOutRequests();
180 * Sends a ping to the server. Bypasses the request queue, but starts the
181 * request timeout timer. If the socket is already closed, or the server
182 * does not respond within the timeout, the client will reconnect.
185 lazy.console.debug("sendPing()");
187 this._startRequestTimeoutTimer();
189 this._wsSendMessage({});
190 this._lastPingTime = Date.now();
192 lazy.console.debug("sendPing: Error sending ping", e);
197 /** Times out any pending requests. */
199 lazy.console.debug("timeOutRequests()");
201 if (!this._hasPendingRequests()) {
202 // Cancel the repeating timer and exit early if we aren't waiting for
203 // pongs or requests.
204 this._requestTimeoutTimer.cancel();
208 let now = Date.now();
210 // Set to true if at least one request timed out, or we're still waiting
211 // for a pong after the request timeout.
212 let requestTimedOut = false;
215 this._lastPingTime > 0 &&
216 now - this._lastPingTime > this._requestTimeout
218 lazy.console.debug("timeOutRequests: Did not receive pong in time");
219 requestTimedOut = true;
221 for (let [key, request] of this._pendingRequests) {
222 let duration = now - request.ctime;
223 // If any of the registration requests time out, all the ones after it
224 // also made to fail, since we are going to be disconnecting the
226 requestTimedOut |= duration > this._requestTimeout;
227 if (requestTimedOut) {
228 request.reject(new Error("Request timed out: " + key));
229 this._pendingRequests.delete(key);
234 // The most likely reason for a pong or registration request timing out is
235 // that the socket has disconnected. Best to reconnect.
236 if (requestTimedOut) {
242 return prefs.getStringPref("userAgentID");
246 if (typeof newID !== "string") {
248 "Got invalid, non-string UAID",
250 "Not updating userAgentID"
254 lazy.console.debug("New _UAID", newID);
255 prefs.setStringPref("userAgentID", newID);
259 _pendingRequests: new Map(),
260 _currentState: STATE_SHUT_DOWN,
262 _requestTimeoutTimer: null,
266 * According to the WS spec, servers should immediately close the underlying
267 * TCP connection after they close a WebSocket. This causes wsOnStop to be
268 * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
269 * WebSocket up, it should try to reconnect. But if the server closes the
270 * WebSocket because it wants the client to back off, then the client
271 * shouldn't re-establish the connection. If the server sends the backoff
272 * close code, this field will be set to true in wsOnServerClose. It is
273 * checked in wsOnStop.
275 _skipReconnect: false,
277 /** Indicates whether the server supports Web Push-style message delivery. */
281 * The last time the client sent a ping to the server. If non-zero, keeps the
282 * request timeout timer active. Reset to zero when the server responds with
283 * a pong or pending messages.
288 * A one-shot timer used to ping the server, to avoid timing out idle
289 * connections. Reset to the ping interval on each incoming message.
293 /** A one-shot timer fired after the reconnect backoff period. */
297 * Sends a message to the Push Server through an open websocket.
298 * typeof(msg) shall be an object
300 _wsSendMessage(msg) {
303 "wsSendMessage: No WebSocket initialized.",
304 "Cannot send a message"
308 msg = JSON.stringify(msg);
309 lazy.console.debug("wsSendMessage: Sending message", msg);
310 this._ws.sendMsg(msg);
313 init(options, mainPushService, serverURI) {
314 lazy.console.debug("init()");
316 this._mainPushService = mainPushService;
317 this._serverURI = serverURI;
318 // Filled in at connect() time
319 this._broadcastListeners = null;
321 // Override the default WebSocket factory function. The returned object
322 // must be null or satisfy the nsIWebSocketChannel interface. Used by
323 // the tests to provide a mock WebSocket implementation.
324 if (options.makeWebSocket) {
325 this._makeWebSocket = options.makeWebSocket;
328 this._requestTimeout = prefs.getIntPref("requestTimeout");
330 return Promise.resolve();
334 lazy.console.debug("reconnect()");
335 this._shutdownWS(false);
336 this._startBackoffTimer();
339 _shutdownWS(shouldCancelPending = true) {
340 lazy.console.debug("shutdownWS()");
342 if (this._currentState == STATE_READY) {
343 prefs.removeObserver("userAgentID", this);
346 this._currentState = STATE_SHUT_DOWN;
347 this._skipReconnect = false;
349 if (this._wsListener) {
350 this._wsListener._pushService = null;
353 this._ws.close(0, null);
357 this._lastPingTime = 0;
359 if (this._pingTimer) {
360 this._pingTimer.cancel();
363 if (shouldCancelPending) {
364 this._cancelPendingRequests();
367 if (this._notifyRequestQueue) {
368 this._notifyRequestQueue();
369 this._notifyRequestQueue = null;
374 // All pending requests (ideally none) are dropped at this point. We
375 // shouldn't have any applications performing registration/unregistration
376 // or receiving notifications.
379 if (this._backoffTimer) {
380 this._backoffTimer.cancel();
382 if (this._requestTimeoutTimer) {
383 this._requestTimeoutTimer.cancel();
386 this._mainPushService = null;
388 this._dataEnabled = false;
392 * How retries work: If the WS is closed due to a socket error,
393 * _startBackoffTimer() is called. The retry timer is started and when
394 * it times out, beginWSSetup() is called again.
396 * If we are in the middle of a timeout (i.e. waiting), but
397 * a register/unregister is called, we don't want to wait around anymore.
398 * _sendRequest will automatically call beginWSSetup(), which will cancel the
399 * timer. In addition since the state will have changed, even if a pending
400 * timer event comes in (because the timer fired the event before it was
401 * cancelled), so the connection won't be reset.
403 _startBackoffTimer() {
404 lazy.console.debug("startBackoffTimer()");
406 // Calculate new timeout, but cap it to pingInterval.
408 prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
409 retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
411 this._retryFailCount++;
414 "startBackoffTimer: Retry in",
420 if (!this._backoffTimer) {
421 this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
425 this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
428 /** Indicates whether we're waiting for pongs or requests. */
429 _hasPendingRequests() {
430 return this._lastPingTime > 0 || this._pendingRequests.size > 0;
434 * Starts the request timeout timer unless we're already waiting for a pong
435 * or register request.
437 _startRequestTimeoutTimer() {
438 if (this._hasPendingRequests()) {
441 if (!this._requestTimeoutTimer) {
442 this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
446 this._requestTimeoutTimer.init(
448 this._requestTimeout,
449 Ci.nsITimer.TYPE_REPEATING_SLACK
453 /** Starts or resets the ping timer. */
455 if (!this._pingTimer) {
456 this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
458 this._pingTimer.init(
460 prefs.getIntPref("pingInterval"),
461 Ci.nsITimer.TYPE_ONE_SHOT
465 _makeWebSocket(uri) {
466 if (!prefs.getBoolPref("connection.enabled")) {
468 "makeWebSocket: connection.enabled is not set to true.",
473 if (Services.io.offline) {
474 lazy.console.warn("makeWebSocket: Network is offline.");
479 ? "@mozilla.org/network/protocol;1?name=ws"
480 : "@mozilla.org/network/protocol;1?name=wss";
481 let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
484 null, // aLoadingNode
485 Services.scriptSecurityManager.getSystemPrincipal(),
486 null, // aTriggeringPrincipal
487 Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
488 Ci.nsIContentPolicy.TYPE_WEBSOCKET
490 // Allow deprecated HTTP request from SystemPrincipal
491 socket.loadInfo.allowDeprecatedSystemRequests = true;
497 lazy.console.debug("beginWSSetup()");
498 if (this._currentState != STATE_SHUT_DOWN) {
500 "_beginWSSetup: Not in shutdown state! Current state",
506 // Stop any pending reconnects scheduled for the near future.
507 if (this._backoffTimer) {
508 this._backoffTimer.cancel();
511 let uri = this._serverURI;
515 let socket = this._makeWebSocket(uri);
519 this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
521 lazy.console.debug("beginWSSetup: Connecting to", uri.spec);
522 this._wsListener = new PushWebSocketListener(this);
523 this._ws.protocol = "push-notification";
526 // Grab a wakelock before we open the socket to ensure we don't go to
527 // sleep before connection the is opened.
528 this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
529 this._currentState = STATE_WAITING_FOR_WS_START;
532 "beginWSSetup: Error opening websocket.",
540 connect(broadcastListeners) {
541 lazy.console.debug("connect()", broadcastListeners);
542 this._broadcastListeners = broadcastListeners;
543 this._beginWSSetup();
551 * Protocol handler invoked by server message.
553 _handleHelloReply(reply) {
554 lazy.console.debug("handleHelloReply()");
555 if (this._currentState != STATE_WAITING_FOR_HELLO) {
557 "handleHelloReply: Unexpected state",
559 "(expected STATE_WAITING_FOR_HELLO)"
565 if (typeof reply.uaid !== "string") {
566 lazy.console.error("handleHelloReply: Received invalid UAID", reply.uaid);
571 if (reply.uaid === "") {
572 lazy.console.error("handleHelloReply: Received empty UAID");
577 // To avoid sticking extra large values sent by an evil server into prefs.
578 if (reply.uaid.length > 128) {
580 "handleHelloReply: UAID received from server was too long",
587 let sendRequests = () => {
588 if (this._notifyRequestQueue) {
589 this._notifyRequestQueue();
590 this._notifyRequestQueue = null;
592 this._sendPendingRequests();
595 function finishHandshake() {
596 this._UAID = reply.uaid;
597 this._currentState = STATE_READY;
598 prefs.addObserver("userAgentID", this);
600 // Handle broadcasts received in response to the "hello" message.
601 if (!lazy.ObjectUtils.isEmpty(reply.broadcasts)) {
602 // The reply isn't technically a broadcast message, but it has
603 // the shape of a broadcast message (it has a broadcasts field).
604 const context = { phase: lazy.pushBroadcastService.PHASES.HELLO };
605 this._mainPushService.receivedBroadcastMessage(reply, context);
608 this._dataEnabled = !!reply.use_webpush;
609 if (this._dataEnabled) {
610 this._mainPushService
614 records.map(record =>
615 this._mainPushService.ensureCrypto(record).catch(error => {
617 "finishHandshake: Error updating record",
631 // By this point we've got a UAID from the server that we are ready to
634 // We unconditionally drop all existing registrations and notify service
635 // workers if we receive a new UAID. This ensures we expunge all stale
636 // registrations if the `userAgentID` pref is reset.
637 if (this._UAID != reply.uaid) {
638 lazy.console.debug("handleHelloReply: Received new UAID");
640 this._mainPushService
641 .dropUnexpiredRegistrations()
642 .then(finishHandshake.bind(this));
647 // otherwise we are good to go
648 finishHandshake.bind(this)();
652 * Protocol handler invoked by server message.
654 _handleRegisterReply(reply) {
655 lazy.console.debug("handleRegisterReply()");
657 let tmp = this._takeRequestForReply(reply);
662 if (reply.status == 200) {
664 Services.io.newURI(reply.pushEndpoint);
666 tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
670 let record = new PushRecordWebSocket({
671 channelID: reply.channelID,
672 pushEndpoint: reply.pushEndpoint,
673 scope: tmp.record.scope,
674 originAttributes: tmp.record.originAttributes,
676 systemRecord: tmp.record.systemRecord,
677 appServerKey: tmp.record.appServerKey,
683 "handleRegisterReply: Unexpected server response",
687 new Error("Wrong status code for register reply: " + reply.status)
692 _handleUnregisterReply(reply) {
693 lazy.console.debug("handleUnregisterReply()");
695 let request = this._takeRequestForReply(reply);
700 let success = reply.status === 200;
701 request.resolve(success);
704 _handleDataUpdate(update) {
706 if (typeof update.channelID != "string") {
708 "handleDataUpdate: Discarding update without channel ID",
713 function updateRecord(record) {
714 // Ignore messages that we've already processed. This can happen if the
715 // connection drops between notifying the service worker and acking the
716 // the message. In that case, the server will re-send the message on
718 if (record.hasRecentMessageID(update.version)) {
720 "handleDataUpdate: Ignoring duplicate message",
725 record.noteRecentMessageID(update.version);
728 if (typeof update.data != "string") {
729 promise = this._mainPushService.receivedPushMessage(
737 let message = ChromeUtils.base64URLDecode(update.data, {
738 // The Push server may append padding.
741 promise = this._mainPushService.receivedPushMessage(
752 this._sendAck(update.channelID, update.version, status);
756 "handleDataUpdate: Error delivering message",
763 Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
769 "handleDataUpdate: Error acknowledging message",
777 * Protocol handler invoked by server message.
779 _handleNotificationReply(reply) {
780 lazy.console.debug("handleNotificationReply()");
781 if (this._dataEnabled) {
782 this._handleDataUpdate(reply);
786 if (typeof reply.updates !== "object") {
788 "handleNotificationReply: Missing updates",
794 lazy.console.debug("handleNotificationReply: Got updates", reply.updates);
795 for (let i = 0; i < reply.updates.length; i++) {
796 let update = reply.updates[i];
797 lazy.console.debug("handleNotificationReply: Handling update", update);
798 if (typeof update.channelID !== "string") {
800 "handleNotificationReply: Invalid update at index",
807 if (update.version === undefined) {
808 lazy.console.debug("handleNotificationReply: Missing version", update);
812 let version = update.version;
814 if (typeof version === "string") {
815 version = parseInt(version, 10);
818 if (typeof version === "number" && version >= 0) {
819 // FIXME(nsm): this relies on app update notification being infallible!
820 // eventually fix this
821 this._receivedUpdate(update.channelID, version);
826 _handleBroadcastReply(reply) {
827 let phase = lazy.pushBroadcastService.PHASES.BROADCAST;
828 // Check if this reply is the result of registration.
829 for (const id of Object.keys(reply.broadcasts)) {
830 const wasRegistering = this._currentlyRegistering.delete(id);
831 if (wasRegistering) {
832 // If we get multiple broadcasts and only one is "registering",
833 // then we consider the phase to be REGISTER for all of them.
834 // It is acceptable since registrations do not happen so often,
835 // and are all very likely to occur soon after browser startup.
836 phase = lazy.pushBroadcastService.PHASES.REGISTER;
839 const context = { phase };
840 this._mainPushService.receivedBroadcastMessage(reply, context);
843 reportDeliveryError(messageID, reason) {
844 lazy.console.debug("reportDeliveryError()");
845 let code = kDELIVERY_REASON_TO_CODE[reason];
847 throw new Error("Invalid delivery error reason");
849 let data = { messageType: "nack", version: messageID, code };
850 this._queueRequest(data);
853 _sendAck(channelID, version, status) {
854 lazy.console.debug("sendAck()");
855 let code = kACK_STATUS_TO_CODE[status];
857 throw new Error("Invalid ack status");
859 let data = { messageType: "ack", updates: [{ channelID, version, code }] };
860 this._queueRequest(data);
864 // generateUUID() gives a UUID surrounded by {...}, slice them off.
865 return Services.uuid.generateUUID().toString().slice(1, -1);
869 lazy.console.debug("register() ", record);
871 let data = { channelID: this._generateID(), messageType: "register" };
873 if (record.appServerKey) {
874 data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
875 // The Push server requires padding.
880 return this._sendRequestForReply(record, data).then(record => {
881 if (!this._dataEnabled) {
884 return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
885 record.p256dhPublicKey = publicKey;
886 record.p256dhPrivateKey = privateKey;
887 record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
893 unregister(record, reason) {
894 lazy.console.debug("unregister() ", record, reason);
896 return Promise.resolve().then(_ => {
897 let code = kUNREGISTER_REASON_TO_CODE[reason];
899 throw new Error("Invalid unregister reason");
902 channelID: record.channelID,
903 messageType: "unregister",
907 return this._sendRequestForReply(record, data);
911 _queueStart: Promise.resolve(),
912 _notifyRequestQueue: null,
915 lazy.console.debug("enqueue()");
917 this._queue = this._queueStart;
919 this._queue = this._queue.then(op).catch(_ => {});
922 /** Sends a request to the server. */
924 if (this._currentState != STATE_READY) {
926 "send: Unexpected state; ignoring message",
931 if (!this._requestHasReply(data)) {
932 this._wsSendMessage(data);
935 // If we're expecting a reply, check that we haven't cancelled the request.
936 let key = this._makePendingRequestKey(data);
937 if (!this._pendingRequests.has(key)) {
938 lazy.console.log("send: Request cancelled; ignoring message", key);
941 this._wsSendMessage(data);
944 /** Indicates whether a request has a corresponding reply from the server. */
945 _requestHasReply(data) {
946 return data.messageType == "register" || data.messageType == "unregister";
950 * Sends all pending requests that expect replies. Called after the connection
951 * is established and the handshake is complete.
953 _sendPendingRequests() {
955 for (let request of this._pendingRequests.values()) {
956 this._send(request.data);
961 /** Queues an outgoing request, establishing a connection if necessary. */
962 _queueRequest(data) {
963 lazy.console.debug("queueRequest()", data);
965 if (this._currentState == STATE_READY) {
966 // If we're ready, no need to queue; just send the request.
971 // Otherwise, we're still setting up. If we don't have a request queue,
973 if (!this._notifyRequestQueue) {
974 let promise = new Promise(resolve => {
975 this._notifyRequestQueue = resolve;
977 this._enqueue(_ => promise);
980 let isRequest = this._requestHasReply(data);
982 // Don't queue requests, since they're stored in `_pendingRequests`, and
983 // `_sendPendingRequests` will send them after reconnecting. Without this
984 // check, we'd send requests twice.
985 this._enqueue(_ => this._send(data));
989 // This will end up calling notifyRequestQueue().
990 this._beginWSSetup();
991 // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
993 if (!this._ws && this._notifyRequestQueue) {
994 this._notifyRequestQueue();
995 this._notifyRequestQueue = null;
1000 _receivedUpdate(aChannelID, aLatestVersion) {
1002 "receivedUpdate: Updating",
1008 this._mainPushService
1009 .receivedPushMessage(aChannelID, "", null, null, record => {
1010 if (record.version === null || record.version < aLatestVersion) {
1012 "receivedUpdate: Version changed for",
1016 record.version = aLatestVersion;
1020 "receivedUpdate: No significant version change for",
1027 this._sendAck(aChannelID, aLatestVersion, status);
1031 "receivedUpdate: Error acknowledging message",
1039 // begin Push protocol handshake
1041 lazy.console.debug("wsOnStart()");
1043 if (this._currentState != STATE_WAITING_FOR_WS_START) {
1045 "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1053 this._mainPushService
1056 records => this._sendHello(records),
1059 "Error fetching existing records before handshake; assuming none",
1062 this._sendHello([]);
1066 // If we failed to send the handshake, back off and reconnect.
1067 lazy.console.warn("Failed to send handshake; reconnecting", err);
1073 * Sends a `hello` handshake to the server.
1075 * @param {Array<PushRecordWebSocket>} An array of records for existing
1076 * subscriptions, used to determine whether to rotate our UAID.
1078 _sendHello(records) {
1080 messageType: "hello",
1081 broadcasts: this._broadcastListeners,
1085 if (records.length && this._UAID) {
1086 // Only send our UAID if we have existing push subscriptions, to
1087 // avoid tying a persistent identifier to the connection (bug
1088 // 1617136). The push server will issue our client a new UAID in
1089 // the `hello` response, which we'll store until either the next
1090 // time we reconnect, or the user subscribes to push. Once we have a
1091 // push subscription, we'll stop rotating the UAID when we connect,
1092 // so that we can receive push messages for them.
1093 data.uaid = this._UAID;
1096 this._wsSendMessage(data);
1097 this._currentState = STATE_WAITING_FOR_HELLO;
1101 * This statusCode is not the websocket protocol status code, but the TCP
1102 * connection close status code.
1104 * If we do not explicitly call ws.close() then statusCode is always
1105 * NS_BASE_STREAM_CLOSED, even on a successful close.
1107 _wsOnStop(context, statusCode) {
1108 lazy.console.debug("wsOnStop()");
1110 if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1112 "wsOnStop: Reconnecting after socket error",
1122 _wsOnMessageAvailable(context, message) {
1123 lazy.console.debug("wsOnMessageAvailable()", message);
1125 // Clearing the last ping time indicates we're no longer waiting for a pong.
1126 this._lastPingTime = 0;
1130 reply = JSON.parse(message);
1132 lazy.console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1136 // If we receive a message, we know the connection succeeded. Reset the
1137 // connection attempt and ping interval counters.
1138 this._retryFailCount = 0;
1140 let doNotHandle = false;
1143 reply.messageType === undefined ||
1144 reply.messageType === "ping" ||
1145 typeof reply.messageType != "string"
1147 lazy.console.debug("wsOnMessageAvailable: Pong received");
1151 // Reset the ping timer. Note: This path is executed at every step of the
1152 // handshake, so this timer does not need to be set explicitly at startup.
1153 this._startPingTimer();
1155 // If it is a ping, do not handle the message.
1157 if (!this._hasPendingRequests()) {
1158 this._requestTimeoutTimer.cancel();
1163 // An allowlist of protocol handlers. Add to these if new messages are added
1173 // Build up the handler name to call from messageType.
1174 // e.g. messageType == "register" -> _handleRegisterReply.
1176 reply.messageType[0].toUpperCase() +
1177 reply.messageType.slice(1).toLowerCase();
1179 if (!handlers.includes(handlerName)) {
1181 "wsOnMessageAvailable: No allowlisted handler",
1189 let handler = "_handle" + handlerName + "Reply";
1191 if (typeof this[handler] !== "function") {
1193 "wsOnMessageAvailable: Handler",
1195 "allowlisted but not implemented"
1200 this[handler](reply);
1204 * The websocket should never be closed. Since we don't call ws.close(),
1205 * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1206 * function), which calls reconnect and re-establishes the WebSocket
1209 * If the server requested that we back off, we won't reconnect until the
1210 * next network state change event, or until we need to send a new register
1213 _wsOnServerClose(context, aStatusCode, aReason) {
1214 lazy.console.debug("wsOnServerClose()", aStatusCode, aReason);
1216 if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1217 lazy.console.debug("wsOnServerClose: Skipping automatic reconnect");
1218 this._skipReconnect = true;
1223 * Rejects all pending register requests with errors.
1225 _cancelPendingRequests() {
1226 for (let request of this._pendingRequests.values()) {
1227 request.reject(new Error("Request aborted"));
1229 this._pendingRequests.clear();
1232 /** Creates a case-insensitive map key for a request that expects a reply. */
1233 _makePendingRequestKey(data) {
1234 return (data.messageType + "|" + data.channelID).toLowerCase();
1237 /** Sends a request and waits for a reply from the server. */
1238 _sendRequestForReply(record, data) {
1239 return Promise.resolve().then(_ => {
1240 // start the timer since we now have at least one request
1241 this._startRequestTimeoutTimer();
1243 let key = this._makePendingRequestKey(data);
1244 if (!this._pendingRequests.has(key)) {
1250 request.promise = new Promise((resolve, reject) => {
1251 request.resolve = resolve;
1252 request.reject = reject;
1254 this._pendingRequests.set(key, request);
1255 this._queueRequest(data);
1258 return this._pendingRequests.get(key).promise;
1262 /** Removes and returns a pending request for a server reply. */
1263 _takeRequestForReply(reply) {
1264 if (typeof reply.channelID !== "string") {
1267 let key = this._makePendingRequestKey(reply);
1268 let request = this._pendingRequests.get(key);
1272 this._pendingRequests.delete(key);
1273 if (!this._hasPendingRequests()) {
1274 this._requestTimeoutTimer.cancel();
1279 sendSubscribeBroadcast(serviceId, version) {
1280 this._currentlyRegistering.add(serviceId);
1282 messageType: "broadcast_subscribe",
1284 [serviceId]: version,
1288 this._queueRequest(data);
1292 function PushRecordWebSocket(record) {
1293 PushRecord.call(this, record);
1294 this.channelID = record.channelID;
1295 this.version = record.version;
1298 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1301 return this.channelID;
1306 PushRecordWebSocket.prototype.toSubscription = function () {
1307 let subscription = PushRecord.prototype.toSubscription.call(this);
1308 subscription.version = this.version;
1309 return subscription;