1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 ChromeUtils.defineESModuleGetters(lazy, {
8 EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
10 BulkPacket: "chrome://remote/content/marionette/packets.sys.mjs",
11 executeSoon: "chrome://remote/content/marionette/sync.sys.mjs",
12 JSONPacket: "chrome://remote/content/marionette/packets.sys.mjs",
13 Packet: "chrome://remote/content/marionette/packets.sys.mjs",
14 StreamUtils: "chrome://remote/content/marionette/stream-utils.sys.mjs",
17 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
18 return Components.Constructor(
19 "@mozilla.org/scriptableinputstream;1",
20 "nsIScriptableInputStream",
25 const flags = { wantVerbose: false, wantLogging: false };
27 const dumpv = flags.wantVerbose
33 const PACKET_HEADER_MAX = 200;
36 * An adapter that handles data transfers between the debugger client
37 * and server. It can work with both nsIPipe and nsIServerSocket
38 * transports so long as the properly created input and output streams
39 * are specified. (However, for intra-process connections,
40 * LocalDebuggerTransport, below, is more efficient than using an nsIPipe
41 * pair with DebuggerTransport.)
43 * @param {nsIAsyncInputStream} input
45 * @param {nsIAsyncOutputStream} output
48 * Given a DebuggerTransport instance dt:
49 * 1) Set dt.hooks to a packet handler object (described below).
50 * 2) Call dt.ready() to begin watching for input packets.
51 * 3) Call dt.send() / dt.startBulkSend() to send packets.
52 * 4) Call dt.close() to close the connection, and disengage from
55 * A packet handler is an object with the following methods:
57 * - onPacket(packet) - called when we have received a complete packet.
58 * |packet| is the parsed form of the packet --- a JavaScript value, not
59 * a JSON-syntax string.
61 * - onBulkPacket(packet) - called when we have switched to bulk packet
62 * receiving mode. |packet| is an object containing:
63 * actor: Name of actor that will receive the packet
64 * type: Name of actor's method that should be called on receipt
65 * length: Size of the data to be read
66 * stream: This input stream should only be used directly if you
67 * can ensure that you will read exactly |length| bytes and
68 * will not close the stream when reading is complete
69 * done: If you use the stream directly (instead of |copyTo|
70 * below), you must signal completion by resolving/rejecting
71 * this deferred. If it's rejected, the transport will
72 * be closed. If an Error is supplied as a rejection value,
73 * it will be logged via |dump|. If you do use |copyTo|,
74 * resolving is taken care of for you when copying completes.
75 * copyTo: A helper function for getting your data out of the
76 * stream that meets the stream handling requirements above,
77 * and has the following signature:
80 * {nsIAsyncOutputStream} output
81 * The stream to copy to.
83 * The promise is resolved when copying completes or
84 * rejected if any (unexpected) errors occur. This object
85 * also emits "progress" events for each chunk that is
86 * copied. See stream-utils.js.
88 * - onClosed(reason) - called when the connection is closed. |reason|
89 * is an optional nsresult or object, typically passed when the
90 * transport is closed due to some error in a underlying stream.
92 * See ./packets.js and the Remote Debugging Protocol specification for
93 * more details on the format of these packets.
97 export function DebuggerTransport(input, output) {
98 lazy.EventEmitter.decorate(this);
101 this._scriptableInput = new lazy.ScriptableInputStream(input);
102 this._output = output;
104 // The current incoming (possibly partial) header, which will determine
105 // which type of Packet |_incoming| below will become.
106 this._incomingHeader = "";
107 // The current incoming Packet object
108 this._incoming = null;
109 // A queue of outgoing Packet objects
115 this._incomingEnabled = true;
116 this._outgoingEnabled = true;
118 this.close = this.close.bind(this);
121 DebuggerTransport.prototype = {
123 * Transmit an object as a JSON packet.
125 * This method returns immediately, without waiting for the entire
126 * packet to be transmitted, registering event handlers as needed to
127 * transmit the entire packet. Packets are transmitted in the order they
128 * are passed to this method.
131 this.emit("send", object);
133 let packet = new lazy.JSONPacket(this);
134 packet.object = object;
135 this._outgoing.push(packet);
136 this._flushOutgoing();
140 * Transmit streaming data via a bulk packet.
142 * This method initiates the bulk send process by queuing up the header
143 * data. The caller receives eventual access to a stream for writing.
145 * N.B.: Do *not* attempt to close the stream handed to you, as it
146 * will continue to be used by this transport afterwards. Most users
147 * should instead use the provided |copyFrom| function instead.
149 * @param {object} header
150 * This is modeled after the format of JSON packets above, but does
151 * not actually contain the data, but is instead just a routing
154 * - actor: Name of actor that will receive the packet
155 * - type: Name of actor's method that should be called on receipt
156 * - length: Size of the data to be sent
159 * The promise will be resolved when you are allowed to write to
160 * the stream with an object containing:
162 * - stream: This output stream should only be used directly
163 * if you can ensure that you will write exactly
164 * |length| bytes and will not close the stream when
165 * writing is complete.
166 * - done: If you use the stream directly (instead of
167 * |copyFrom| below), you must signal completion by
168 * resolving/rejecting this deferred. If it's
169 * rejected, the transport will be closed. If an
170 * Error is supplied as a rejection value, it will
171 * be logged via |dump|. If you do use |copyFrom|,
172 * resolving is taken care of for you when copying
174 * - copyFrom: A helper function for getting your data onto the
175 * stream that meets the stream handling requirements
176 * above, and has the following signature:
179 * {nsIAsyncInputStream} input
180 * The stream to copy from.
181 * - returns {Promise}
182 * The promise is resolved when copying completes
183 * or rejected if any (unexpected) errors occur.
184 * This object also emits "progress" events for
185 * each chunkthat is copied. See stream-utils.js.
187 startBulkSend(header) {
188 this.emit("startbulksend", header);
190 let packet = new lazy.BulkPacket(this);
191 packet.header = header;
192 this._outgoing.push(packet);
193 this._flushOutgoing();
194 return packet.streamReadyForWriting;
198 * Close the transport.
200 * @param {(nsresult|object)=} reason
201 * The status code or error message that corresponds to the reason
202 * for closing the transport (likely because a stream closed
206 this.emit("close", reason);
210 this._scriptableInput.close();
211 this._output.close();
212 this._destroyIncoming();
213 this._destroyAllOutgoing();
215 this.hooks.onClosed(reason);
219 dumpv("Transport closed: " + reason);
221 dumpv("Transport closed.");
226 * The currently outgoing packet (at the top of the queue).
228 get _currentOutgoing() {
229 return this._outgoing[0];
233 * Flush data to the outgoing stream. Waits until the output
234 * stream notifies us that it is ready to be written to (via
235 * onOutputStreamReady).
238 if (!this._outgoingEnabled || this._outgoing.length === 0) {
242 // If the top of the packet queue has nothing more to send, remove it.
243 if (this._currentOutgoing.done) {
244 this._finishCurrentOutgoing();
247 if (this._outgoing.length) {
248 let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
249 this._output.asyncWait(this, 0, 0, threadManager.currentThread);
254 * Pause this transport's attempts to write to the output stream.
255 * This is used when we've temporarily handed off our output stream for
259 this._outgoingEnabled = false;
263 * Resume this transport's attempts to write to the output stream.
266 this._outgoingEnabled = true;
267 this._flushOutgoing();
270 // nsIOutputStreamCallback
272 * This is called when the output stream is ready for more data to
273 * be written. The current outgoing packet will attempt to write some
274 * amount of data, but may not complete.
276 onOutputStreamReady(stream) {
277 if (!this._outgoingEnabled || this._outgoing.length === 0) {
282 this._currentOutgoing.write(stream);
284 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
285 this.close(e.result);
291 this._flushOutgoing();
295 * Remove the current outgoing packet from the queue upon completion.
297 _finishCurrentOutgoing() {
298 if (this._currentOutgoing) {
299 this._currentOutgoing.destroy();
300 this._outgoing.shift();
305 * Clear the entire outgoing queue.
307 _destroyAllOutgoing() {
308 for (let packet of this._outgoing) {
315 * Initialize the input stream for reading. Once this method has been
316 * called, we watch for packets on the input stream, and pass them to
317 * the appropriate handlers via this.hooks.
321 this._waitForIncoming();
325 * Asks the input stream to notify us (via onInputStreamReady) when it is
329 if (this._incomingEnabled) {
330 let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
331 this._input.asyncWait(this, 0, 0, threadManager.currentThread);
336 * Pause this transport's attempts to read from the input stream.
337 * This is used when we've temporarily handed off our input stream for
341 this._incomingEnabled = false;
345 * Resume this transport's attempts to read from the input stream.
348 this._incomingEnabled = true;
349 this._flushIncoming();
350 this._waitForIncoming();
353 // nsIInputStreamCallback
355 * Called when the stream is either readable or closed.
357 onInputStreamReady(stream) {
360 stream.available() &&
361 this._incomingEnabled &&
362 this._processIncoming(stream, stream.available())
364 // Loop until there is nothing more to process
366 this._waitForIncoming();
368 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
369 this.close(e.result);
377 * Process the incoming data. Will create a new currently incoming
378 * Packet if needed. Tells the incoming Packet to read as much data
379 * as it can, but reading may not complete. The Packet signals that
380 * its data is ready for delivery by calling one of this transport's
381 * _on*Ready methods (see ./packets.js and the _on*Ready methods below).
384 * Whether incoming stream processing should continue for any
387 _processIncoming(stream, count) {
388 dumpv("Data available: " + count);
391 dumpv("Nothing to read, skipping");
396 if (!this._incoming) {
397 dumpv("Creating a new packet from incoming");
399 if (!this._readHeader(stream)) {
400 // Not enough data to read packet type
404 // Attempt to create a new Packet by trying to parse each possible
406 this._incoming = lazy.Packet.fromHeader(this._incomingHeader, this);
407 if (!this._incoming) {
409 "No packet types for header: " + this._incomingHeader
414 if (!this._incoming.done) {
415 // We have an incomplete packet, keep reading it.
416 dumpv("Existing packet incomplete, keep reading");
417 this._incoming.read(stream, this._scriptableInput);
420 dump(`Error reading incoming packet: (${e} - ${e.stack})\n`);
422 // Now in an invalid state, shut down the transport.
427 if (!this._incoming.done) {
428 // Still not complete, we'll wait for more data.
429 dumpv("Packet not done, wait for more");
433 // Ready for next packet
434 this._flushIncoming();
439 * Read as far as we can into the incoming data, attempting to build
440 * up a complete packet header (which terminates with ":"). We'll only
441 * read up to PACKET_HEADER_MAX characters.
444 * True if we now have a complete header.
447 let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
448 this._incomingHeader += lazy.StreamUtils.delimitedRead(
449 this._scriptableInput,
453 if (flags.wantVerbose) {
454 dumpv("Header read: " + this._incomingHeader);
457 if (this._incomingHeader.endsWith(":")) {
458 if (flags.wantVerbose) {
459 dumpv("Found packet header successfully: " + this._incomingHeader);
464 if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
465 throw new Error("Failed to parse packet header!");
468 // Not enough data yet.
473 * If the incoming packet is done, log it as needed and clear the buffer.
476 if (!this._incoming.done) {
479 if (flags.wantLogging) {
480 dumpv("Got: " + this._incoming);
482 this._destroyIncoming();
486 * Handler triggered by an incoming JSONPacket completing it's |read|
487 * method. Delivers the packet to this.hooks.onPacket.
489 _onJSONObjectReady(object) {
490 lazy.executeSoon(() => {
491 // Ensure the transport is still alive by the time this runs.
493 this.emit("packet", object);
494 this.hooks.onPacket(object);
500 * Handler triggered by an incoming BulkPacket entering the |read|
501 * phase for the stream portion of the packet. Delivers info about the
502 * incoming streaming data to this.hooks.onBulkPacket. See the main
503 * comment on the transport at the top of this file for more details.
505 _onBulkReadReady(...args) {
506 lazy.executeSoon(() => {
507 // Ensure the transport is still alive by the time this runs.
509 this.emit("bulkpacket", ...args);
510 this.hooks.onBulkPacket(...args);
516 * Remove all handlers and references related to the current incoming
517 * packet, either because it is now complete or because the transport
521 if (this._incoming) {
522 this._incoming.destroy();
524 this._incomingHeader = "";
525 this._incoming = null;