1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
3 * You can obtain one at http://mozilla.org/MPL/2.0/. */
7 const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
8 const { XPCOMUtils } = ChromeUtils.import(
9 "resource://gre/modules/XPCOMUtils.jsm"
12 const { PushDB } = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
13 const { PushRecord } = ChromeUtils.import(
14 "resource://gre/modules/PushRecord.jsm"
16 const { PushCrypto } = ChromeUtils.import(
17 "resource://gre/modules/PushCrypto.jsm"
20 ChromeUtils.defineModuleGetter(
22 "pushBroadcastService",
23 "resource://gre/modules/PushBroadcastService.jsm"
25 ChromeUtils.defineModuleGetter(
28 "resource://gre/modules/ObjectUtils.jsm"
31 const kPUSHWSDB_DB_NAME = "pushapi";
32 const kPUSHWSDB_DB_VERSION = 5; // Change this if the IndexedDB format changes
33 const kPUSHWSDB_STORE_NAME = "pushapi";
35 // WebSocket close code sent by the server to indicate that the client should
36 // not automatically reconnect.
37 const kBACKOFF_WS_STATUS_CODE = 4774;
39 // Maps ack statuses, unsubscribe reasons, and delivery error reasons to codes
40 // included in request payloads.
41 const kACK_STATUS_TO_CODE = {
42 [Ci.nsIPushErrorReporter.ACK_DELIVERED]: 100,
43 [Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR]: 101,
44 [Ci.nsIPushErrorReporter.ACK_NOT_DELIVERED]: 102,
47 const kUNREGISTER_REASON_TO_CODE = {
48 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_MANUAL]: 200,
49 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_QUOTA_EXCEEDED]: 201,
50 [Ci.nsIPushErrorReporter.UNSUBSCRIBE_PERMISSION_REVOKED]: 202,
53 const kDELIVERY_REASON_TO_CODE = {
54 [Ci.nsIPushErrorReporter.DELIVERY_UNCAUGHT_EXCEPTION]: 301,
55 [Ci.nsIPushErrorReporter.DELIVERY_UNHANDLED_REJECTION]: 302,
56 [Ci.nsIPushErrorReporter.DELIVERY_INTERNAL_ERROR]: 303,
59 const prefs = Services.prefs.getBranch("dom.push.");
61 const EXPORTED_SYMBOLS = ["PushServiceWebSocket"];
63 XPCOMUtils.defineLazyGetter(this, "console", () => {
64 let { ConsoleAPI } = ChromeUtils.import("resource://gre/modules/Console.jsm");
65 return new ConsoleAPI({
66 maxLogLevelPref: "dom.push.loglevel",
67 prefix: "PushServiceWebSocket",
72 * A proxy between the PushService and the WebSocket. The listener is used so
73 * that the PushService can silence messages from the WebSocket by setting
74 * PushWebSocketListener._pushService to null. This is required because
75 * a WebSocket can continue to send messages or errors after it has been
76 * closed but the PushService may not be interested in these. It's easier to
77 * stop listening than to have checks at specific points.
79 var PushWebSocketListener = function(pushService) {
80 this._pushService = pushService;
83 PushWebSocketListener.prototype = {
85 if (!this._pushService) {
88 this._pushService._wsOnStart(context);
91 onStop(context, statusCode) {
92 if (!this._pushService) {
95 this._pushService._wsOnStop(context, statusCode);
98 onAcknowledge(context, size) {
102 onBinaryMessageAvailable(context, message) {
106 onMessageAvailable(context, message) {
107 if (!this._pushService) {
110 this._pushService._wsOnMessageAvailable(context, message);
113 onServerClose(context, aStatusCode, aReason) {
114 if (!this._pushService) {
117 this._pushService._wsOnServerClose(context, aStatusCode, aReason);
123 const STATE_SHUT_DOWN = 0;
124 // Websocket has been opened on client side, waiting for successful open.
126 const STATE_WAITING_FOR_WS_START = 1;
127 // Websocket opened, hello sent, waiting for server reply (_handleHelloReply).
128 const STATE_WAITING_FOR_HELLO = 2;
129 // Websocket operational, handshake completed, begin protocol messaging.
130 const STATE_READY = 3;
132 var PushServiceWebSocket = {
133 _mainPushService: null,
135 _currentlyRegistering: new Set(),
140 kPUSHWSDB_DB_VERSION,
141 kPUSHWSDB_STORE_NAME,
151 observe(aSubject, aTopic, aData) {
152 if (aTopic == "nsPref:changed" && aData == "dom.push.userAgentID") {
153 this._onUAIDChanged();
154 } else if (aTopic == "timer-callback") {
155 this._onTimerFired(aSubject);
160 * Handles a UAID change. Unlike reconnects, we cancel all pending requests
161 * after disconnecting. Existing subscriptions stored in IndexedDB will be
162 * dropped on reconnect.
165 console.debug("onUAIDChanged()");
168 this._startBackoffTimer();
171 /** Handles a ping, backoff, or request timeout timer event. */
172 _onTimerFired(timer) {
173 console.debug("onTimerFired()");
175 if (timer == this._pingTimer) {
180 if (timer == this._backoffTimer) {
181 console.debug("onTimerFired: Reconnecting after backoff");
182 this._beginWSSetup();
186 if (timer == this._requestTimeoutTimer) {
187 this._timeOutRequests();
192 * Sends a ping to the server. Bypasses the request queue, but starts the
193 * request timeout timer. If the socket is already closed, or the server
194 * does not respond within the timeout, the client will reconnect.
197 console.debug("sendPing()");
199 this._startRequestTimeoutTimer();
201 this._wsSendMessage({});
202 this._lastPingTime = Date.now();
204 console.debug("sendPing: Error sending ping", e);
209 /** Times out any pending requests. */
211 console.debug("timeOutRequests()");
213 if (!this._hasPendingRequests()) {
214 // Cancel the repeating timer and exit early if we aren't waiting for
215 // pongs or requests.
216 this._requestTimeoutTimer.cancel();
220 let now = Date.now();
222 // Set to true if at least one request timed out, or we're still waiting
223 // for a pong after the request timeout.
224 let requestTimedOut = false;
227 this._lastPingTime > 0 &&
228 now - this._lastPingTime > this._requestTimeout
230 console.debug("timeOutRequests: Did not receive pong in time");
231 requestTimedOut = true;
233 for (let [key, request] of this._pendingRequests) {
234 let duration = now - request.ctime;
235 // If any of the registration requests time out, all the ones after it
236 // also made to fail, since we are going to be disconnecting the
238 requestTimedOut |= duration > this._requestTimeout;
239 if (requestTimedOut) {
240 request.reject(new Error("Request timed out: " + key));
241 this._pendingRequests.delete(key);
246 // The most likely reason for a pong or registration request timing out is
247 // that the socket has disconnected. Best to reconnect.
248 if (requestTimedOut) {
254 return prefs.getStringPref("userAgentID");
258 if (typeof newID !== "string") {
260 "Got invalid, non-string UAID",
262 "Not updating userAgentID"
266 console.debug("New _UAID", newID);
267 prefs.setStringPref("userAgentID", newID);
271 _pendingRequests: new Map(),
272 _currentState: STATE_SHUT_DOWN,
274 _requestTimeoutTimer: null,
278 * According to the WS spec, servers should immediately close the underlying
279 * TCP connection after they close a WebSocket. This causes wsOnStop to be
280 * called with error NS_BASE_STREAM_CLOSED. Since the client has to keep the
281 * WebSocket up, it should try to reconnect. But if the server closes the
282 * WebSocket because it wants the client to back off, then the client
283 * shouldn't re-establish the connection. If the server sends the backoff
284 * close code, this field will be set to true in wsOnServerClose. It is
285 * checked in wsOnStop.
287 _skipReconnect: false,
289 /** Indicates whether the server supports Web Push-style message delivery. */
293 * The last time the client sent a ping to the server. If non-zero, keeps the
294 * request timeout timer active. Reset to zero when the server responds with
295 * a pong or pending messages.
300 * A one-shot timer used to ping the server, to avoid timing out idle
301 * connections. Reset to the ping interval on each incoming message.
305 /** A one-shot timer fired after the reconnect backoff period. */
309 * Sends a message to the Push Server through an open websocket.
310 * typeof(msg) shall be an object
312 _wsSendMessage(msg) {
315 "wsSendMessage: No WebSocket initialized.",
316 "Cannot send a message"
320 msg = JSON.stringify(msg);
321 console.debug("wsSendMessage: Sending message", msg);
322 this._ws.sendMsg(msg);
325 init(options, mainPushService, serverURI) {
326 console.debug("init()");
328 this._mainPushService = mainPushService;
329 this._serverURI = serverURI;
330 // Filled in at connect() time
331 this._broadcastListeners = null;
333 // Override the default WebSocket factory function. The returned object
334 // must be null or satisfy the nsIWebSocketChannel interface. Used by
335 // the tests to provide a mock WebSocket implementation.
336 if (options.makeWebSocket) {
337 this._makeWebSocket = options.makeWebSocket;
340 this._requestTimeout = prefs.getIntPref("requestTimeout");
342 return Promise.resolve();
346 console.debug("reconnect()");
347 this._shutdownWS(false);
348 this._startBackoffTimer();
351 _shutdownWS(shouldCancelPending = true) {
352 console.debug("shutdownWS()");
354 if (this._currentState == STATE_READY) {
355 prefs.removeObserver("userAgentID", this);
358 this._currentState = STATE_SHUT_DOWN;
359 this._skipReconnect = false;
361 if (this._wsListener) {
362 this._wsListener._pushService = null;
365 this._ws.close(0, null);
369 this._lastPingTime = 0;
371 if (this._pingTimer) {
372 this._pingTimer.cancel();
375 if (shouldCancelPending) {
376 this._cancelPendingRequests();
379 if (this._notifyRequestQueue) {
380 this._notifyRequestQueue();
381 this._notifyRequestQueue = null;
386 // All pending requests (ideally none) are dropped at this point. We
387 // shouldn't have any applications performing registration/unregistration
388 // or receiving notifications.
391 if (this._backoffTimer) {
392 this._backoffTimer.cancel();
394 if (this._requestTimeoutTimer) {
395 this._requestTimeoutTimer.cancel();
398 this._mainPushService = null;
400 this._dataEnabled = false;
404 * How retries work: If the WS is closed due to a socket error,
405 * _startBackoffTimer() is called. The retry timer is started and when
406 * it times out, beginWSSetup() is called again.
408 * If we are in the middle of a timeout (i.e. waiting), but
409 * a register/unregister is called, we don't want to wait around anymore.
410 * _sendRequest will automatically call beginWSSetup(), which will cancel the
411 * timer. In addition since the state will have changed, even if a pending
412 * timer event comes in (because the timer fired the event before it was
413 * cancelled), so the connection won't be reset.
415 _startBackoffTimer() {
416 console.debug("startBackoffTimer()");
418 // Calculate new timeout, but cap it to pingInterval.
420 prefs.getIntPref("retryBaseInterval") * Math.pow(2, this._retryFailCount);
421 retryTimeout = Math.min(retryTimeout, prefs.getIntPref("pingInterval"));
423 this._retryFailCount++;
426 "startBackoffTimer: Retry in",
432 if (!this._backoffTimer) {
433 this._backoffTimer = Cc["@mozilla.org/timer;1"].createInstance(
437 this._backoffTimer.init(this, retryTimeout, Ci.nsITimer.TYPE_ONE_SHOT);
440 /** Indicates whether we're waiting for pongs or requests. */
441 _hasPendingRequests() {
442 return this._lastPingTime > 0 || this._pendingRequests.size > 0;
446 * Starts the request timeout timer unless we're already waiting for a pong
447 * or register request.
449 _startRequestTimeoutTimer() {
450 if (this._hasPendingRequests()) {
453 if (!this._requestTimeoutTimer) {
454 this._requestTimeoutTimer = Cc["@mozilla.org/timer;1"].createInstance(
458 this._requestTimeoutTimer.init(
460 this._requestTimeout,
461 Ci.nsITimer.TYPE_REPEATING_SLACK
465 /** Starts or resets the ping timer. */
467 if (!this._pingTimer) {
468 this._pingTimer = Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
470 this._pingTimer.init(
472 prefs.getIntPref("pingInterval"),
473 Ci.nsITimer.TYPE_ONE_SHOT
477 _makeWebSocket(uri) {
478 if (!prefs.getBoolPref("connection.enabled")) {
480 "makeWebSocket: connection.enabled is not set to true.",
485 if (Services.io.offline) {
486 console.warn("makeWebSocket: Network is offline.");
491 ? "@mozilla.org/network/protocol;1?name=ws"
492 : "@mozilla.org/network/protocol;1?name=wss";
493 let socket = Cc[contractId].createInstance(Ci.nsIWebSocketChannel);
496 null, // aLoadingNode
497 Services.scriptSecurityManager.getSystemPrincipal(),
498 null, // aTriggeringPrincipal
499 Ci.nsILoadInfo.SEC_ALLOW_CROSS_ORIGIN_SEC_CONTEXT_IS_NULL,
500 Ci.nsIContentPolicy.TYPE_WEBSOCKET
502 // Allow deprecated HTTP request from SystemPrincipal
503 socket.loadInfo.allowDeprecatedSystemRequests = true;
509 console.debug("beginWSSetup()");
510 if (this._currentState != STATE_SHUT_DOWN) {
512 "_beginWSSetup: Not in shutdown state! Current state",
518 // Stop any pending reconnects scheduled for the near future.
519 if (this._backoffTimer) {
520 this._backoffTimer.cancel();
523 let uri = this._serverURI;
527 let socket = this._makeWebSocket(uri);
531 this._ws = socket.QueryInterface(Ci.nsIWebSocketChannel);
533 console.debug("beginWSSetup: Connecting to", uri.spec);
534 this._wsListener = new PushWebSocketListener(this);
535 this._ws.protocol = "push-notification";
538 // Grab a wakelock before we open the socket to ensure we don't go to
539 // sleep before connection the is opened.
540 this._ws.asyncOpen(uri, uri.spec, {}, 0, this._wsListener, null);
541 this._currentState = STATE_WAITING_FOR_WS_START;
544 "beginWSSetup: Error opening websocket.",
552 connect(broadcastListeners) {
553 console.debug("connect()", broadcastListeners);
554 this._broadcastListeners = broadcastListeners;
555 this._beginWSSetup();
563 * Protocol handler invoked by server message.
565 _handleHelloReply(reply) {
566 console.debug("handleHelloReply()");
567 if (this._currentState != STATE_WAITING_FOR_HELLO) {
569 "handleHelloReply: Unexpected state",
571 "(expected STATE_WAITING_FOR_HELLO)"
577 if (typeof reply.uaid !== "string") {
578 console.error("handleHelloReply: Received invalid UAID", reply.uaid);
583 if (reply.uaid === "") {
584 console.error("handleHelloReply: Received empty UAID");
589 // To avoid sticking extra large values sent by an evil server into prefs.
590 if (reply.uaid.length > 128) {
592 "handleHelloReply: UAID received from server was too long",
599 let sendRequests = () => {
600 if (this._notifyRequestQueue) {
601 this._notifyRequestQueue();
602 this._notifyRequestQueue = null;
604 this._sendPendingRequests();
607 function finishHandshake() {
608 this._UAID = reply.uaid;
609 this._currentState = STATE_READY;
610 prefs.addObserver("userAgentID", this);
612 // Handle broadcasts received in response to the "hello" message.
613 if (!ObjectUtils.isEmpty(reply.broadcasts)) {
614 // The reply isn't technically a broadcast message, but it has
615 // the shape of a broadcast message (it has a broadcasts field).
616 const context = { phase: pushBroadcastService.PHASES.HELLO };
617 this._mainPushService.receivedBroadcastMessage(reply, context);
620 this._dataEnabled = !!reply.use_webpush;
621 if (this._dataEnabled) {
622 this._mainPushService
626 records.map(record =>
627 this._mainPushService.ensureCrypto(record).catch(error => {
629 "finishHandshake: Error updating record",
643 // By this point we've got a UAID from the server that we are ready to
646 // We unconditionally drop all existing registrations and notify service
647 // workers if we receive a new UAID. This ensures we expunge all stale
648 // registrations if the `userAgentID` pref is reset.
649 if (this._UAID != reply.uaid) {
650 console.debug("handleHelloReply: Received new UAID");
652 this._mainPushService
653 .dropUnexpiredRegistrations()
654 .then(finishHandshake.bind(this));
659 // otherwise we are good to go
660 finishHandshake.bind(this)();
664 * Protocol handler invoked by server message.
666 _handleRegisterReply(reply) {
667 console.debug("handleRegisterReply()");
669 let tmp = this._takeRequestForReply(reply);
674 if (reply.status == 200) {
676 Services.io.newURI(reply.pushEndpoint);
678 tmp.reject(new Error("Invalid push endpoint: " + reply.pushEndpoint));
682 let record = new PushRecordWebSocket({
683 channelID: reply.channelID,
684 pushEndpoint: reply.pushEndpoint,
685 scope: tmp.record.scope,
686 originAttributes: tmp.record.originAttributes,
688 systemRecord: tmp.record.systemRecord,
689 appServerKey: tmp.record.appServerKey,
694 console.error("handleRegisterReply: Unexpected server response", reply);
696 new Error("Wrong status code for register reply: " + reply.status)
701 _handleUnregisterReply(reply) {
702 console.debug("handleUnregisterReply()");
704 let request = this._takeRequestForReply(reply);
709 let success = reply.status === 200;
710 request.resolve(success);
713 _handleDataUpdate(update) {
715 if (typeof update.channelID != "string") {
717 "handleDataUpdate: Discarding update without channel ID",
722 function updateRecord(record) {
723 // Ignore messages that we've already processed. This can happen if the
724 // connection drops between notifying the service worker and acking the
725 // the message. In that case, the server will re-send the message on
727 if (record.hasRecentMessageID(update.version)) {
729 "handleDataUpdate: Ignoring duplicate message",
734 record.noteRecentMessageID(update.version);
737 if (typeof update.data != "string") {
738 promise = this._mainPushService.receivedPushMessage(
746 let message = ChromeUtils.base64URLDecode(update.data, {
747 // The Push server may append padding.
750 promise = this._mainPushService.receivedPushMessage(
761 this._sendAck(update.channelID, update.version, status);
765 "handleDataUpdate: Error delivering message",
772 Ci.nsIPushErrorReporter.ACK_DECRYPTION_ERROR
778 "handleDataUpdate: Error acknowledging message",
786 * Protocol handler invoked by server message.
788 _handleNotificationReply(reply) {
789 console.debug("handleNotificationReply()");
790 if (this._dataEnabled) {
791 this._handleDataUpdate(reply);
795 if (typeof reply.updates !== "object") {
796 console.warn("handleNotificationReply: Missing updates", reply.updates);
800 console.debug("handleNotificationReply: Got updates", reply.updates);
801 for (let i = 0; i < reply.updates.length; i++) {
802 let update = reply.updates[i];
803 console.debug("handleNotificationReply: Handling update", update);
804 if (typeof update.channelID !== "string") {
806 "handleNotificationReply: Invalid update at index",
813 if (update.version === undefined) {
814 console.debug("handleNotificationReply: Missing version", update);
818 let version = update.version;
820 if (typeof version === "string") {
821 version = parseInt(version, 10);
824 if (typeof version === "number" && version >= 0) {
825 // FIXME(nsm): this relies on app update notification being infallible!
826 // eventually fix this
827 this._receivedUpdate(update.channelID, version);
832 _handleBroadcastReply(reply) {
833 let phase = pushBroadcastService.PHASES.BROADCAST;
834 // Check if this reply is the result of registration.
835 for (const id of Object.keys(reply.broadcasts)) {
836 const wasRegistering = this._currentlyRegistering.delete(id);
837 if (wasRegistering) {
838 // If we get multiple broadcasts and only one is "registering",
839 // then we consider the phase to be REGISTER for all of them.
840 // It is acceptable since registrations do not happen so often,
841 // and are all very likely to occur soon after browser startup.
842 phase = pushBroadcastService.PHASES.REGISTER;
845 const context = { phase };
846 this._mainPushService.receivedBroadcastMessage(reply, context);
849 reportDeliveryError(messageID, reason) {
850 console.debug("reportDeliveryError()");
851 let code = kDELIVERY_REASON_TO_CODE[reason];
853 throw new Error("Invalid delivery error reason");
855 let data = { messageType: "nack", version: messageID, code };
856 this._queueRequest(data);
859 _sendAck(channelID, version, status) {
860 console.debug("sendAck()");
861 let code = kACK_STATUS_TO_CODE[status];
863 throw new Error("Invalid ack status");
865 let data = { messageType: "ack", updates: [{ channelID, version, code }] };
866 this._queueRequest(data);
870 // generateUUID() gives a UUID surrounded by {...}, slice them off.
878 console.debug("register() ", record);
880 let data = { channelID: this._generateID(), messageType: "register" };
882 if (record.appServerKey) {
883 data.key = ChromeUtils.base64URLEncode(record.appServerKey, {
884 // The Push server requires padding.
889 return this._sendRequestForReply(record, data).then(record => {
890 if (!this._dataEnabled) {
893 return PushCrypto.generateKeys().then(([publicKey, privateKey]) => {
894 record.p256dhPublicKey = publicKey;
895 record.p256dhPrivateKey = privateKey;
896 record.authenticationSecret = PushCrypto.generateAuthenticationSecret();
902 unregister(record, reason) {
903 console.debug("unregister() ", record, reason);
905 return Promise.resolve().then(_ => {
906 let code = kUNREGISTER_REASON_TO_CODE[reason];
908 throw new Error("Invalid unregister reason");
911 channelID: record.channelID,
912 messageType: "unregister",
916 return this._sendRequestForReply(record, data);
920 _queueStart: Promise.resolve(),
921 _notifyRequestQueue: null,
924 console.debug("enqueue()");
926 this._queue = this._queueStart;
928 this._queue = this._queue.then(op).catch(_ => {});
931 /** Sends a request to the server. */
933 if (this._currentState != STATE_READY) {
935 "send: Unexpected state; ignoring message",
940 if (!this._requestHasReply(data)) {
941 this._wsSendMessage(data);
944 // If we're expecting a reply, check that we haven't cancelled the request.
945 let key = this._makePendingRequestKey(data);
946 if (!this._pendingRequests.has(key)) {
947 console.log("send: Request cancelled; ignoring message", key);
950 this._wsSendMessage(data);
953 /** Indicates whether a request has a corresponding reply from the server. */
954 _requestHasReply(data) {
955 return data.messageType == "register" || data.messageType == "unregister";
959 * Sends all pending requests that expect replies. Called after the connection
960 * is established and the handshake is complete.
962 _sendPendingRequests() {
964 for (let request of this._pendingRequests.values()) {
965 this._send(request.data);
970 /** Queues an outgoing request, establishing a connection if necessary. */
971 _queueRequest(data) {
972 console.debug("queueRequest()", data);
974 if (this._currentState == STATE_READY) {
975 // If we're ready, no need to queue; just send the request.
980 // Otherwise, we're still setting up. If we don't have a request queue,
982 if (!this._notifyRequestQueue) {
983 let promise = new Promise((resolve, reject) => {
984 this._notifyRequestQueue = resolve;
986 this._enqueue(_ => promise);
989 let isRequest = this._requestHasReply(data);
991 // Don't queue requests, since they're stored in `_pendingRequests`, and
992 // `_sendPendingRequests` will send them after reconnecting. Without this
993 // check, we'd send requests twice.
994 this._enqueue(_ => this._send(data));
998 // This will end up calling notifyRequestQueue().
999 this._beginWSSetup();
1000 // If beginWSSetup does not succeed to make ws, notifyRequestQueue will
1002 if (!this._ws && this._notifyRequestQueue) {
1003 this._notifyRequestQueue();
1004 this._notifyRequestQueue = null;
1009 _receivedUpdate(aChannelID, aLatestVersion) {
1010 console.debug("receivedUpdate: Updating", aChannelID, "->", aLatestVersion);
1012 this._mainPushService
1013 .receivedPushMessage(aChannelID, "", null, null, record => {
1014 if (record.version === null || record.version < aLatestVersion) {
1016 "receivedUpdate: Version changed for",
1020 record.version = aLatestVersion;
1024 "receivedUpdate: No significant version change for",
1031 this._sendAck(aChannelID, aLatestVersion, status);
1035 "receivedUpdate: Error acknowledging message",
1043 // begin Push protocol handshake
1044 _wsOnStart(context) {
1045 console.debug("wsOnStart()");
1047 if (this._currentState != STATE_WAITING_FOR_WS_START) {
1049 "wsOnStart: NOT in STATE_WAITING_FOR_WS_START. Current",
1057 this._mainPushService
1060 records => this._sendHello(records),
1063 "Error fetching existing records before handshake; assuming none",
1066 this._sendHello([]);
1070 // If we failed to send the handshake, back off and reconnect.
1071 console.warn("Failed to send handshake; reconnecting", err);
1077 * Sends a `hello` handshake to the server.
1079 * @param {Array<PushRecordWebSocket>} An array of records for existing
1080 * subscriptions, used to determine whether to rotate our UAID.
1082 _sendHello(records) {
1084 messageType: "hello",
1085 broadcasts: this._broadcastListeners,
1089 if (records.length && this._UAID) {
1090 // Only send our UAID if we have existing push subscriptions, to
1091 // avoid tying a persistent identifier to the connection (bug
1092 // 1617136). The push server will issue our client a new UAID in
1093 // the `hello` response, which we'll store until either the next
1094 // time we reconnect, or the user subscribes to push. Once we have a
1095 // push subscription, we'll stop rotating the UAID when we connect,
1096 // so that we can receive push messages for them.
1097 data.uaid = this._UAID;
1100 this._wsSendMessage(data);
1101 this._currentState = STATE_WAITING_FOR_HELLO;
1105 * This statusCode is not the websocket protocol status code, but the TCP
1106 * connection close status code.
1108 * If we do not explicitly call ws.close() then statusCode is always
1109 * NS_BASE_STREAM_CLOSED, even on a successful close.
1111 _wsOnStop(context, statusCode) {
1112 console.debug("wsOnStop()");
1114 if (statusCode != Cr.NS_OK && !this._skipReconnect) {
1115 console.debug("wsOnStop: Reconnecting after socket error", statusCode);
1123 _wsOnMessageAvailable(context, message) {
1124 console.debug("wsOnMessageAvailable()", message);
1126 // Clearing the last ping time indicates we're no longer waiting for a pong.
1127 this._lastPingTime = 0;
1131 reply = JSON.parse(message);
1133 console.warn("wsOnMessageAvailable: Invalid JSON", message, e);
1137 // If we receive a message, we know the connection succeeded. Reset the
1138 // connection attempt and ping interval counters.
1139 this._retryFailCount = 0;
1141 let doNotHandle = false;
1144 reply.messageType === undefined ||
1145 reply.messageType === "ping" ||
1146 typeof reply.messageType != "string"
1148 console.debug("wsOnMessageAvailable: Pong received");
1152 // Reset the ping timer. Note: This path is executed at every step of the
1153 // handshake, so this timer does not need to be set explicitly at startup.
1154 this._startPingTimer();
1156 // If it is a ping, do not handle the message.
1161 // A whitelist of protocol handlers. Add to these if new messages are added
1171 // Build up the handler name to call from messageType.
1172 // e.g. messageType == "register" -> _handleRegisterReply.
1174 reply.messageType[0].toUpperCase() +
1175 reply.messageType.slice(1).toLowerCase();
1177 if (!handlers.includes(handlerName)) {
1179 "wsOnMessageAvailable: No whitelisted handler",
1187 let handler = "_handle" + handlerName + "Reply";
1189 if (typeof this[handler] !== "function") {
1191 "wsOnMessageAvailable: Handler",
1193 "whitelisted but not implemented"
1198 this[handler](reply);
1202 * The websocket should never be closed. Since we don't call ws.close(),
1203 * _wsOnStop() receives error code NS_BASE_STREAM_CLOSED (see comment in that
1204 * function), which calls reconnect and re-establishes the WebSocket
1207 * If the server requested that we back off, we won't reconnect until the
1208 * next network state change event, or until we need to send a new register
1211 _wsOnServerClose(context, aStatusCode, aReason) {
1212 console.debug("wsOnServerClose()", aStatusCode, aReason);
1214 if (aStatusCode == kBACKOFF_WS_STATUS_CODE) {
1215 console.debug("wsOnServerClose: Skipping automatic reconnect");
1216 this._skipReconnect = true;
1221 * Rejects all pending register requests with errors.
1223 _cancelPendingRequests() {
1224 for (let request of this._pendingRequests.values()) {
1225 request.reject(new Error("Request aborted"));
1227 this._pendingRequests.clear();
1230 /** Creates a case-insensitive map key for a request that expects a reply. */
1231 _makePendingRequestKey(data) {
1232 return (data.messageType + "|" + data.channelID).toLowerCase();
1235 /** Sends a request and waits for a reply from the server. */
1236 _sendRequestForReply(record, data) {
1237 return Promise.resolve().then(_ => {
1238 // start the timer since we now have at least one request
1239 this._startRequestTimeoutTimer();
1241 let key = this._makePendingRequestKey(data);
1242 if (!this._pendingRequests.has(key)) {
1248 request.promise = new Promise((resolve, reject) => {
1249 request.resolve = resolve;
1250 request.reject = reject;
1252 this._pendingRequests.set(key, request);
1253 this._queueRequest(data);
1256 return this._pendingRequests.get(key).promise;
1260 /** Removes and returns a pending request for a server reply. */
1261 _takeRequestForReply(reply) {
1262 if (typeof reply.channelID !== "string") {
1265 let key = this._makePendingRequestKey(reply);
1266 let request = this._pendingRequests.get(key);
1270 this._pendingRequests.delete(key);
1271 if (!this._hasPendingRequests()) {
1272 this._requestTimeoutTimer.cancel();
1277 sendSubscribeBroadcast(serviceId, version) {
1278 this._currentlyRegistering.add(serviceId);
1280 messageType: "broadcast_subscribe",
1282 [serviceId]: version,
1286 this._queueRequest(data);
1290 function PushRecordWebSocket(record) {
1291 PushRecord.call(this, record);
1292 this.channelID = record.channelID;
1293 this.version = record.version;
1296 PushRecordWebSocket.prototype = Object.create(PushRecord.prototype, {
1299 return this.channelID;
1304 PushRecordWebSocket.prototype.toSubscription = function() {
1305 let subscription = PushRecord.prototype.toSubscription.call(this);
1306 subscription.version = this.version;
1307 return subscription;