Bug 1467571 [wpt PR 11385] - Make manifest's parsers quicker, a=testonly
[gecko.git] / devtools / shared / transport / stream-utils.js
blobbf159b6825b84123c0cf4d7f7617dd38323c377d
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 "use strict";
7 const { Ci, Cc, Cr, CC } = require("chrome");
8 const Services = require("Services");
9 const DevToolsUtils = require("devtools/shared/DevToolsUtils");
10 const { dumpv } = DevToolsUtils;
11 const EventEmitter = require("devtools/shared/event-emitter");
12 const defer = require("devtools/shared/defer");
14 DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
15   return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
16 });
18 DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
19   return CC("@mozilla.org/scriptableinputstream;1",
20             "nsIScriptableInputStream", "init");
21 });
23 const BUFFER_SIZE = 0x8000;
25 /**
26  * This helper function (and its companion object) are used by bulk senders and
27  * receivers to read and write data in and out of other streams.  Functions that
28  * make use of this tool are passed to callers when it is time to read or write
29  * bulk data.  It is highly recommended to use these copier functions instead of
30  * the stream directly because the copier enforces the agreed upon length.
31  * Since bulk mode reuses an existing stream, the sender and receiver must write
32  * and read exactly the agreed upon amount of data, or else the entire transport
33  * will be left in a invalid state.  Additionally, other methods of stream
34  * copying (such as NetUtil.asyncCopy) close the streams involved, which would
35  * terminate the debugging transport, and so it is avoided here.
36  *
37  * Overall, this *works*, but clearly the optimal solution would be able to just
38  * use the streams directly.  If it were possible to fully implement
39  * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
40  * enforce the length and avoid closing, and consumers could use familiar stream
41  * utilities like NetUtil.asyncCopy.
42  *
43  * The function takes two async streams and copies a precise number of bytes
44  * from one to the other.  Copying begins immediately, but may complete at some
45  * future time depending on data size.  Use the returned promise to know when
46  * it's complete.
47  *
48  * @param input nsIAsyncInputStream
49  *        The stream to copy from.
50  * @param output nsIAsyncOutputStream
51  *        The stream to copy to.
52  * @param length Integer
53  *        The amount of data that needs to be copied.
54  * @return Promise
55  *         The promise is resolved when copying completes or rejected if any
56  *         (unexpected) errors occur.
57  */
58 function copyStream(input, output, length) {
59   const copier = new StreamCopier(input, output, length);
60   return copier.copy();
63 function StreamCopier(input, output, length) {
64   EventEmitter.decorate(this);
65   this._id = StreamCopier._nextId++;
66   this.input = input;
67   // Save off the base output stream, since we know it's async as we've required
68   this.baseAsyncOutput = output;
69   if (IOUtil.outputStreamIsBuffered(output)) {
70     this.output = output;
71   } else {
72     this.output = Cc["@mozilla.org/network/buffered-output-stream;1"]
73                   .createInstance(Ci.nsIBufferedOutputStream);
74     this.output.init(output, BUFFER_SIZE);
75   }
76   this._length = length;
77   this._amountLeft = length;
78   this._deferred = defer();
80   this._copy = this._copy.bind(this);
81   this._flush = this._flush.bind(this);
82   this._destroy = this._destroy.bind(this);
84   // Copy promise's then method up to this object.
85   // Allows the copier to offer a promise interface for the simple succeed or
86   // fail scenarios, but also emit events (due to the EventEmitter) for other
87   // states, like progress.
88   this.then = this._deferred.promise.then.bind(this._deferred.promise);
89   this.then(this._destroy, this._destroy);
91   // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
92   // if flushing would block the output stream.
93   this._streamReadyCallback = this._copy;
95 StreamCopier._nextId = 0;
97 StreamCopier.prototype = {
99   copy: function() {
100     // Dispatch to the next tick so that it's possible to attach a progress
101     // event listener, even for extremely fast copies (like when testing).
102     Services.tm.dispatchToMainThread(() => {
103       try {
104         this._copy();
105       } catch (e) {
106         this._deferred.reject(e);
107       }
108     });
109     return this;
110   },
112   _copy: function() {
113     const bytesAvailable = this.input.available();
114     const amountToCopy = Math.min(bytesAvailable, this._amountLeft);
115     this._debug("Trying to copy: " + amountToCopy);
117     let bytesCopied;
118     try {
119       bytesCopied = this.output.writeFrom(this.input, amountToCopy);
120     } catch (e) {
121       if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
122         this._debug("Base stream would block, will retry");
123         this._debug("Waiting for output stream");
124         this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
125         return;
126       }
127       throw e;
128     }
130     this._amountLeft -= bytesCopied;
131     this._debug("Copied: " + bytesCopied +
132                 ", Left: " + this._amountLeft);
133     this._emitProgress();
135     if (this._amountLeft === 0) {
136       this._debug("Copy done!");
137       this._flush();
138       return;
139     }
141     this._debug("Waiting for input stream");
142     this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
143   },
145   _emitProgress: function() {
146     this.emit("progress", {
147       bytesSent: this._length - this._amountLeft,
148       totalBytes: this._length
149     });
150   },
152   _flush: function() {
153     try {
154       this.output.flush();
155     } catch (e) {
156       if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
157           e.result == Cr.NS_ERROR_FAILURE) {
158         this._debug("Flush would block, will retry");
159         this._streamReadyCallback = this._flush;
160         this._debug("Waiting for output stream");
161         this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
162         return;
163       }
164       throw e;
165     }
166     this._deferred.resolve();
167   },
169   _destroy: function() {
170     this._destroy = null;
171     this._copy = null;
172     this._flush = null;
173     this.input = null;
174     this.output = null;
175   },
177   // nsIInputStreamCallback
178   onInputStreamReady: function() {
179     this._streamReadyCallback();
180   },
182   // nsIOutputStreamCallback
183   onOutputStreamReady: function() {
184     this._streamReadyCallback();
185   },
187   _debug: function(msg) {
188     // Prefix logs with the copier ID, which makes logs much easier to
189     // understand when several copiers are running simultaneously
190     dumpv("Copier: " + this._id + " " + msg);
191   }
196  * Read from a stream, one byte at a time, up to the next |delimiter|
197  * character, but stopping if we've read |count| without finding it.  Reading
198  * also terminates early if there are less than |count| bytes available on the
199  * stream.  In that case, we only read as many bytes as the stream currently has
200  * to offer.
201  * TODO: This implementation could be removed if bug 984651 is fixed, which
202  *       provides a native version of the same idea.
203  * @param stream nsIInputStream
204  *        The input stream to read from.
205  * @param delimiter string
206  *        The character we're trying to find.
207  * @param count integer
208  *        The max number of characters to read while searching.
209  * @return string
210  *         The data collected.  If the delimiter was found, this string will
211  *         end with it.
212  */
213 function delimitedRead(stream, delimiter, count) {
214   dumpv("Starting delimited read for " + delimiter + " up to " +
215         count + " bytes");
217   let scriptableStream;
218   if (stream instanceof Ci.nsIScriptableInputStream) {
219     scriptableStream = stream;
220   } else {
221     scriptableStream = new ScriptableInputStream(stream);
222   }
224   let data = "";
226   // Don't exceed what's available on the stream
227   count = Math.min(count, stream.available());
229   if (count <= 0) {
230     return data;
231   }
233   let char;
234   while (char !== delimiter && count > 0) {
235     char = scriptableStream.readBytes(1);
236     count--;
237     data += char;
238   }
240   return data;
243 module.exports = {
244   copyStream: copyStream,
245   delimitedRead: delimitedRead