1 /* jshint moz: true, esnext: true */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
4 * You can obtain one at http://mozilla.org/MPL/2.0/. */
8 const {PushDB} = ChromeUtils.import("resource://gre/modules/PushDB.jsm");
9 const {PushRecord} = ChromeUtils.import("resource://gre/modules/PushRecord.jsm");
10 ChromeUtils.import("resource://gre/modules/XPCOMUtils.jsm");
11 ChromeUtils.import("resource://gre/modules/Services.jsm");
12 ChromeUtils.import("resource://gre/modules/NetUtil.jsm");
13 ChromeUtils.import("resource://gre/modules/IndexedDBHelper.jsm");
14 ChromeUtils.import("resource://gre/modules/Timer.jsm");
19 } = ChromeUtils.import("resource://gre/modules/PushCrypto.jsm");
21 var EXPORTED_SYMBOLS = ["PushServiceHttp2"];
23 XPCOMUtils.defineLazyGetter(this, "console", () => {
24 let {ConsoleAPI} = ChromeUtils.import("resource://gre/modules/Console.jsm", {});
25 return new ConsoleAPI({
26 maxLogLevelPref: "dom.push.loglevel",
27 prefix: "PushServiceHttp2",
31 const prefs = Services.prefs.getBranch("dom.push.");
33 const kPUSHHTTP2DB_DB_NAME = "pushHttp2";
34 const kPUSHHTTP2DB_DB_VERSION = 5; // Change this if the IndexedDB format changes
35 const kPUSHHTTP2DB_STORE_NAME = "pushHttp2";
38 * A proxy between the PushService and connections listening for incoming push
39 * messages. The PushService can silence messages from the connections by
40 * setting PushSubscriptionListener._pushService to null. This is required
41 * because it can happen that there is an outstanding push message that will
42 * be send on OnStopRequest but the PushService may not be interested in these.
43 * It's easier to stop listening than to have checks at specific points.
45 var PushSubscriptionListener = function(pushService, uri) {
46 console.debug("PushSubscriptionListener()");
47 this._pushService = pushService;
51 PushSubscriptionListener.prototype = {
53 QueryInterface: function (aIID) {
54 if (aIID.equals(Ci.nsIHttpPushListener) ||
55 aIID.equals(Ci.nsIStreamListener)) {
58 throw Cr.NS_ERROR_NO_INTERFACE;
61 getInterface: function(aIID) {
62 return this.QueryInterface(aIID);
65 onStartRequest: function(aRequest, aContext) {
66 console.debug("PushSubscriptionListener: onStartRequest()");
67 // We do not do anything here.
70 onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
71 console.debug("PushSubscriptionListener: onDataAvailable()");
72 // Nobody should send data, but just to be sure, otherwise necko will
78 let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
79 .createInstance(Ci.nsIScriptableInputStream);
81 inputStream.init(aStream);
82 var data = inputStream.read(aCount);
85 onStopRequest: function(aRequest, aContext, aStatusCode) {
86 console.debug("PushSubscriptionListener: onStopRequest()");
87 if (!this._pushService) {
91 this._pushService.connOnStop(aRequest,
92 Components.isSuccessCode(aStatusCode),
96 onPush: function(associatedChannel, pushChannel) {
97 console.debug("PushSubscriptionListener: onPush()");
98 var pushChannelListener = new PushChannelListener(this);
99 pushChannel.asyncOpen2(pushChannelListener);
102 disconnect: function() {
103 this._pushService = null;
108 * The listener for pushed messages. The message data is collected in
109 * OnDataAvailable and send to the app in OnStopRequest.
111 var PushChannelListener = function(pushSubscriptionListener) {
112 console.debug("PushChannelListener()");
113 this._mainListener = pushSubscriptionListener;
118 PushChannelListener.prototype = {
120 onStartRequest: function(aRequest, aContext) {
121 this._ackUri = aRequest.URI.spec;
124 onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
125 console.debug("PushChannelListener: onDataAvailable()");
131 let inputStream = Cc["@mozilla.org/binaryinputstream;1"]
132 .createInstance(Ci.nsIBinaryInputStream);
134 inputStream.setInputStream(aStream);
135 let chunk = new ArrayBuffer(aCount);
136 inputStream.readArrayBuffer(aCount, chunk);
137 this._message.push(chunk);
140 onStopRequest: function(aRequest, aContext, aStatusCode) {
141 console.debug("PushChannelListener: onStopRequest()", "status code",
143 if (Components.isSuccessCode(aStatusCode) &&
144 this._mainListener &&
145 this._mainListener._pushService) {
147 encryption_key: getHeaderField(aRequest, "Encryption-Key"),
148 crypto_key: getHeaderField(aRequest, "Crypto-Key"),
149 encryption: getHeaderField(aRequest, "Encryption"),
150 encoding: getHeaderField(aRequest, "Content-Encoding"),
152 let msg = concatArray(this._message);
154 this._mainListener._pushService._pushChannelOnStop(this._mainListener.uri,
162 function getHeaderField(aRequest, name) {
164 return aRequest.getRequestHeader(name);
166 // getRequestHeader can throw.
171 var PushServiceDelete = function(resolve, reject) {
172 this._resolve = resolve;
173 this._reject = reject;
176 PushServiceDelete.prototype = {
178 onStartRequest: function(aRequest, aContext) {},
180 onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
181 // Nobody should send data, but just to be sure, otherwise necko will
187 let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
188 .createInstance(Ci.nsIScriptableInputStream);
190 inputStream.init(aStream);
191 var data = inputStream.read(aCount);
194 onStopRequest: function(aRequest, aContext, aStatusCode) {
196 if (Components.isSuccessCode(aStatusCode)) {
199 this._reject(new Error("Error removing subscription: " + aStatusCode));
204 var SubscriptionListener = function(aSubInfo, aResolve, aReject,
205 aServerURI, aPushServiceHttp2) {
206 console.debug("SubscriptionListener()");
207 this._subInfo = aSubInfo;
208 this._resolve = aResolve;
209 this._reject = aReject;
211 this._serverURI = aServerURI;
212 this._service = aPushServiceHttp2;
213 this._ctime = Date.now();
214 this._retryTimeoutID = null;
217 SubscriptionListener.prototype = {
219 onStartRequest: function(aRequest, aContext) {},
221 onDataAvailable: function(aRequest, aContext, aStream, aOffset, aCount) {
222 console.debug("SubscriptionListener: onDataAvailable()");
224 // We do not expect any data, but necko will complain if we do not consume
230 let inputStream = Cc["@mozilla.org/scriptableinputstream;1"]
231 .createInstance(Ci.nsIScriptableInputStream);
233 inputStream.init(aStream);
234 this._data.concat(inputStream.read(aCount));
237 onStopRequest: function(aRequest, aContext, aStatus) {
238 console.debug("SubscriptionListener: onStopRequest()");
240 // Check if pushService is still active.
241 if (!this._service.hasmainPushService()) {
242 this._reject(new Error("Push service unavailable"));
246 if (!Components.isSuccessCode(aStatus)) {
247 this._reject(new Error("Error listening for messages: " + aStatus));
251 var statusCode = aRequest.QueryInterface(Ci.nsIHttpChannel).responseStatus;
253 if (Math.floor(statusCode / 100) == 5) {
254 if (this._subInfo.retries < prefs.getIntPref("http2.maxRetries")) {
255 this._subInfo.retries++;
256 var retryAfter = retryAfterParser(aRequest);
257 this._retryTimeoutID = setTimeout(_ =>
262 subInfo: this._subInfo
264 this._service.removeListenerPendingRetry(this);
265 this._retryTimeoutID = null;
267 this._service.addListenerPendingRetry(this);
269 this._reject(new Error("Unexpected server response: " + statusCode));
272 } else if (statusCode != 201) {
273 this._reject(new Error("Unexpected server response: " + statusCode));
279 subscriptionUri = aRequest.getResponseHeader("location");
281 this._reject(new Error("Missing Location header"));
285 console.debug("onStopRequest: subscriptionUri", subscriptionUri);
289 linkList = aRequest.getResponseHeader("link");
291 this._reject(new Error("Missing Link header"));
295 var linkParserResult;
297 linkParserResult = linkParser(linkList, this._serverURI);
303 if (!subscriptionUri) {
304 this._reject(new Error("Invalid Location header"));
308 let uriTry = Services.io.newURI(subscriptionUri);
310 console.error("onStopRequest: Invalid subscription URI",
312 this._reject(new Error("Invalid subscription endpoint: " +
317 let reply = new PushRecordHttp2({
318 subscriptionUri: subscriptionUri,
319 pushEndpoint: linkParserResult.pushEndpoint,
320 pushReceiptEndpoint: linkParserResult.pushReceiptEndpoint,
321 scope: this._subInfo.record.scope,
322 originAttributes: this._subInfo.record.originAttributes,
323 systemRecord: this._subInfo.record.systemRecord,
324 appServerKey: this._subInfo.record.appServerKey,
328 this._resolve(reply);
331 abortRetry: function() {
332 if (this._retryTimeoutID != null) {
333 clearTimeout(this._retryTimeoutID);
334 this._retryTimeoutID = null;
336 console.debug("SubscriptionListener.abortRetry: aborting non-existent retry?");
341 function retryAfterParser(aRequest) {
344 var retryField = aRequest.getResponseHeader("retry-after");
345 if (isNaN(retryField)) {
346 retryAfter = Date.parse(retryField) - (new Date().getTime());
348 retryAfter = parseInt(retryField, 10) * 1000;
350 retryAfter = (retryAfter > 0) ? retryAfter : 0;
356 function linkParser(linkHeader, serverURI) {
358 var linkList = linkHeader.split(',');
359 if ((linkList.length < 1)) {
360 throw new Error("Invalid Link header");
364 var pushReceiptEndpoint;
366 linkList.forEach(link => {
367 var linkElems = link.split(';');
369 if (linkElems.length == 2) {
370 if (linkElems[1].trim() === 'rel="urn:ietf:params:push"') {
371 pushEndpoint = linkElems[0].substring(linkElems[0].indexOf('<') + 1,
372 linkElems[0].indexOf('>'));
374 } else if (linkElems[1].trim() === 'rel="urn:ietf:params:push:receipt"') {
375 pushReceiptEndpoint = linkElems[0].substring(linkElems[0].indexOf('<') + 1,
376 linkElems[0].indexOf('>'));
381 console.debug("linkParser: pushEndpoint", pushEndpoint);
382 console.debug("linkParser: pushReceiptEndpoint", pushReceiptEndpoint);
383 // Missing pushReceiptEndpoint is allowed.
385 throw new Error("Missing push endpoint");
388 var pushURI = Services.io.newURI(pushEndpoint, null, serverURI);
390 if (pushReceiptEndpoint) {
391 pushReceiptURI = Services.io.newURI(pushReceiptEndpoint, null,
396 pushEndpoint: pushURI.spec,
397 pushReceiptEndpoint: (pushReceiptURI) ? pushReceiptURI.spec : "",
402 * The implementation of the WebPush.
404 var PushServiceHttp2 = {
405 _mainPushService: null,
408 // Keep information about all connections, e.g. the channel, listener...
412 // Set of SubscriptionListeners that are pending a subscription retry attempt.
413 _listenersPendingRetry: new Set(),
415 newPushDB: function() {
416 return new PushDB(kPUSHHTTP2DB_DB_NAME,
417 kPUSHHTTP2DB_DB_VERSION,
418 kPUSHHTTP2DB_STORE_NAME,
423 hasmainPushService: function() {
424 return this._mainPushService !== null;
427 validServerURI: function(serverURI) {
428 if (serverURI.scheme == "http") {
429 return !!prefs.getBoolPref("testing.allowInsecureServerURL", false);
431 return serverURI.scheme == "https";
434 connect: function(subscriptions) {
435 this.startConnections(subscriptions);
438 isConnected: function() {
439 return this._mainPushService != null;
442 disconnect: function() {
443 this._shutdownConnections(false);
446 _makeChannel: function(aUri) {
447 var chan = NetUtil.newChannel({uri: aUri, loadUsingSystemPrincipal: true})
448 .QueryInterface(Ci.nsIHttpChannel);
450 var loadGroup = Cc["@mozilla.org/network/load-group;1"]
451 .createInstance(Ci.nsILoadGroup);
452 chan.loadGroup = loadGroup;
457 * Subscribe new resource.
459 register: function(aRecord) {
460 console.debug("subscribeResource()");
462 return this._subscribeResourceInternal({
467 PushCrypto.generateKeys()
468 .then(([publicKey, privateKey]) => {
469 result.p256dhPublicKey = publicKey;
470 result.p256dhPrivateKey = privateKey;
471 result.authenticationSecret = PushCrypto.generateAuthenticationSecret();
472 this._conns[result.subscriptionUri] = {
475 countUnableToConnect: 0,
476 lastStartListening: 0,
479 this._listenForMsgs(result.subscriptionUri);
485 _subscribeResourceInternal: function(aSubInfo) {
486 console.debug("subscribeResourceInternal()");
488 return new Promise((resolve, reject) => {
489 var listener = new SubscriptionListener(aSubInfo,
495 var chan = this._makeChannel(this._serverURI.spec);
496 chan.requestMethod = "POST";
497 chan.asyncOpen2(listener);
500 if ("retry" in err) {
501 return this._subscribeResourceInternal(err.subInfo);
508 _deleteResource: function(aUri) {
510 return new Promise((resolve,reject) => {
511 var chan = this._makeChannel(aUri);
512 chan.requestMethod = "DELETE";
513 chan.asyncOpen2(new PushServiceDelete(resolve, reject));
518 * Unsubscribe the resource with a subscription uri aSubscriptionUri.
519 * We can't do anything about it if it fails, so we don't listen for response.
521 _unsubscribeResource: function(aSubscriptionUri) {
522 console.debug("unsubscribeResource()");
524 return this._deleteResource(aSubscriptionUri);
528 * Start listening for messages.
530 _listenForMsgs: function(aSubscriptionUri) {
531 console.debug("listenForMsgs()", aSubscriptionUri);
532 if (!this._conns[aSubscriptionUri]) {
533 console.warn("listenForMsgs: We do not have this subscription",
538 var chan = this._makeChannel(aSubscriptionUri);
541 var listener = new PushSubscriptionListener(this, aSubscriptionUri);
542 conn.listener = listener;
544 chan.notificationCallbacks = listener;
547 chan.asyncOpen2(listener);
549 console.error("listenForMsgs: Error connecting to push server.",
550 "asyncOpen2 failed", e);
551 conn.listener.disconnect();
552 chan.cancel(Cr.NS_ERROR_ABORT);
553 this._retryAfterBackoff(aSubscriptionUri, -1);
557 this._conns[aSubscriptionUri].lastStartListening = Date.now();
558 this._conns[aSubscriptionUri].channel = conn.channel;
559 this._conns[aSubscriptionUri].listener = conn.listener;
563 _ackMsgRecv: function(aAckUri) {
564 console.debug("ackMsgRecv()", aAckUri);
565 return this._deleteResource(aAckUri);
568 init: function(aOptions, aMainPushService, aServerURL) {
569 console.debug("init()");
570 this._mainPushService = aMainPushService;
571 this._serverURI = aServerURL;
573 return Promise.resolve();
576 _retryAfterBackoff: function(aSubscriptionUri, retryAfter) {
577 console.debug("retryAfterBackoff()");
579 var resetRetryCount = prefs.getIntPref("http2.reset_retry_count_after_ms");
580 // If it was running for some time, reset retry counter.
581 if ((Date.now() - this._conns[aSubscriptionUri].lastStartListening) >
583 this._conns[aSubscriptionUri].countUnableToConnect = 0;
586 let maxRetries = prefs.getIntPref("http2.maxRetries");
587 if (this._conns[aSubscriptionUri].countUnableToConnect >= maxRetries) {
588 this._shutdownSubscription(aSubscriptionUri);
589 this._resubscribe(aSubscriptionUri);
593 if (retryAfter !== -1) {
594 // This is a 5xx response.
595 this._conns[aSubscriptionUri].countUnableToConnect++;
596 this._conns[aSubscriptionUri].retryTimerID =
597 setTimeout(_ => this._listenForMsgs(aSubscriptionUri), retryAfter);
601 retryAfter = prefs.getIntPref("http2.retryInterval") *
602 Math.pow(2, this._conns[aSubscriptionUri].countUnableToConnect);
604 retryAfter = retryAfter * (0.8 + Math.random() * 0.4); // add +/-20%.
606 this._conns[aSubscriptionUri].countUnableToConnect++;
607 this._conns[aSubscriptionUri].retryTimerID =
608 setTimeout(_ => this._listenForMsgs(aSubscriptionUri), retryAfter);
610 console.debug("retryAfterBackoff: Retry in", retryAfter);
613 // Close connections.
614 _shutdownConnections: function(deleteInfo) {
615 console.debug("shutdownConnections()");
617 for (let subscriptionUri in this._conns) {
618 if (this._conns[subscriptionUri]) {
619 if (this._conns[subscriptionUri].listener) {
620 this._conns[subscriptionUri].listener._pushService = null;
623 if (this._conns[subscriptionUri].channel) {
625 this._conns[subscriptionUri].channel.cancel(Cr.NS_ERROR_ABORT);
628 this._conns[subscriptionUri].listener = null;
629 this._conns[subscriptionUri].channel = null;
631 if (this._conns[subscriptionUri].retryTimerID > 0) {
632 clearTimeout(this._conns[subscriptionUri].retryTimerID);
636 delete this._conns[subscriptionUri];
642 // Start listening if subscriptions present.
643 startConnections: function(aSubscriptions) {
644 console.debug("startConnections()", aSubscriptions.length);
646 for (let i = 0; i < aSubscriptions.length; i++) {
647 let record = aSubscriptions[i];
648 this._mainPushService.ensureCrypto(record).then(record => {
649 this._startSingleConnection(record);
651 console.error("startConnections: Error updating record",
652 record.keyID, error);
657 _startSingleConnection: function(record) {
658 console.debug("_startSingleConnection()");
659 if (typeof this._conns[record.subscriptionUri] != "object") {
660 this._conns[record.subscriptionUri] = {channel: null,
662 countUnableToConnect: 0,
665 if (!this._conns[record.subscriptionUri].conn) {
666 this._listenForMsgs(record.subscriptionUri);
670 // Close connection and notify apps that subscription are gone.
671 _shutdownSubscription: function(aSubscriptionUri) {
672 console.debug("shutdownSubscriptions()");
674 if (typeof this._conns[aSubscriptionUri] == "object") {
675 if (this._conns[aSubscriptionUri].listener) {
676 this._conns[aSubscriptionUri].listener._pushService = null;
679 if (this._conns[aSubscriptionUri].channel) {
681 this._conns[aSubscriptionUri].channel.cancel(Cr.NS_ERROR_ABORT);
684 delete this._conns[aSubscriptionUri];
689 console.debug("uninit()");
690 this._abortPendingSubscriptionRetries();
691 this._shutdownConnections(true);
692 this._mainPushService = null;
695 _abortPendingSubscriptionRetries: function() {
696 this._listenersPendingRetry.forEach((listener) => listener.abortRetry());
697 this._listenersPendingRetry.clear();
700 unregister: function(aRecord) {
701 this._shutdownSubscription(aRecord.subscriptionUri);
702 return this._unsubscribeResource(aRecord.subscriptionUri);
705 reportDeliveryError: function(messageID, reason) {
706 console.warn("reportDeliveryError: Ignoring message delivery error",
710 /** Push server has deleted subscription.
711 * Re-subscribe - if it succeeds send update db record and send
712 * pushsubscriptionchange,
713 * - on error delete record and send pushsubscriptionchange
714 * TODO: maybe pushsubscriptionerror will be included.
716 _resubscribe: function(aSubscriptionUri) {
717 this._mainPushService.getByKeyID(aSubscriptionUri)
718 .then(record => this.register(record)
720 if (this._mainPushService) {
721 this._mainPushService
722 .updateRegistrationAndNotifyApp(aSubscriptionUri, recordNew)
723 .catch(Cu.reportError);
726 if (this._mainPushService) {
727 this._mainPushService
728 .dropRegistrationAndNotifyApp(aSubscriptionUri)
729 .catch(Cu.reportError);
735 connOnStop: function(aRequest, aSuccess,
737 console.debug("connOnStop() succeeded", aSuccess);
739 var conn = this._conns[aSubscriptionUri];
741 // there is no connection description that means that we closed
742 // connection, so do nothing. But we should have already deleted
748 conn.listener = null;
751 this._retryAfterBackoff(aSubscriptionUri, -1);
753 } else if (Math.floor(aRequest.responseStatus / 100) == 5) {
754 var retryAfter = retryAfterParser(aRequest);
755 this._retryAfterBackoff(aSubscriptionUri, retryAfter);
757 } else if (Math.floor(aRequest.responseStatus / 100) == 4) {
758 this._shutdownSubscription(aSubscriptionUri);
759 this._resubscribe(aSubscriptionUri);
760 } else if (Math.floor(aRequest.responseStatus / 100) == 2) { // This should be 204
761 setTimeout(_ => this._listenForMsgs(aSubscriptionUri), 0);
763 this._retryAfterBackoff(aSubscriptionUri, -1);
767 addListenerPendingRetry: function(aListener) {
768 this._listenersPendingRetry.add(aListener);
771 removeListenerPendingRetry: function(aListener) {
772 if (!this._listenersPendingRetry.remove(aListener)) {
773 console.debug("removeListenerPendingRetry: listener not in list?");
777 _pushChannelOnStop: function(aUri, aAckUri, aHeaders, aMessage) {
778 console.debug("pushChannelOnStop()");
780 this._mainPushService.receivedPushMessage(
781 aUri, "", aHeaders, aMessage, record => {
782 // Always update the stored record.
786 .then(_ => this._ackMsgRecv(aAckUri))
788 console.error("pushChannelOnStop: Error receiving message",
794 function PushRecordHttp2(record) {
795 PushRecord.call(this, record);
796 this.subscriptionUri = record.subscriptionUri;
797 this.pushReceiptEndpoint = record.pushReceiptEndpoint;
800 PushRecordHttp2.prototype = Object.create(PushRecord.prototype, {
803 return this.subscriptionUri;
808 PushRecordHttp2.prototype.toSubscription = function() {
809 let subscription = PushRecord.prototype.toSubscription.call(this);
810 subscription.pushReceiptEndpoint = this.pushReceiptEndpoint;