From 66d0cf5616a4897b9983e43558335604a6deb726 Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Sun, 20 Apr 2008 05:54:25 -0400 Subject: [PATCH] Add sideband support to PackFetchConnection When we fetch from a remote system over the anonymous Git protocol we don't have progress messages come back on stderr like we would on an SSH connection. Instead we need to have them multiplexed on the same stream as the pack data itself, and then split the data on the client side to the proper locations. Unlike C Git (and FetchClient before us) we avoid using threads to do the de-multiplexing and instead perform this as part of the InputStream implementation we pass off to IndexPack. This keeps the our fetch code simple and avoids any issues related to thread synchronization. Signed-off-by: Shawn O. Pearce --- .../org/spearce/jgit/lib/TextProgressMonitor.java | 6 +- .../jgit/transport/PackFetchConnection.java | 12 +- .../org/spearce/jgit/transport/PacketLineIn.java | 7 +- .../jgit/transport/SideBandInputStream.java | 178 +++++++++++++++++++++ 4 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 org.spearce.jgit/src/org/spearce/jgit/transport/SideBandInputStream.java diff --git a/org.spearce.jgit/src/org/spearce/jgit/lib/TextProgressMonitor.java b/org.spearce.jgit/src/org/spearce/jgit/lib/TextProgressMonitor.java index 4d5d7abc..5ac28714 100644 --- a/org.spearce.jgit/src/org/spearce/jgit/lib/TextProgressMonitor.java +++ b/org.spearce.jgit/src/org/spearce/jgit/lib/TextProgressMonitor.java @@ -55,10 +55,8 @@ public class TextProgressMonitor implements ProgressMonitor { if (!output && System.currentTimeMillis() - taskBeganAt < 500) return; if (totalWork == UNKNOWN) { - if (cmp % 100 == 0) { - display(cmp); - System.err.flush(); - } + display(cmp); + System.err.flush(); } else { if ((cmp * 100 / totalWork) != (lastWorked * 100) / totalWork) { display(cmp); diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/PackFetchConnection.java b/org.spearce.jgit/src/org/spearce/jgit/transport/PackFetchConnection.java index 0f731fbf..c7fa66c0 100644 --- a/org.spearce.jgit/src/org/spearce/jgit/transport/PackFetchConnection.java +++ b/org.spearce.jgit/src/org/spearce/jgit/transport/PackFetchConnection.java @@ -74,6 +74,8 @@ abstract class PackFetchConnection extends FetchConnection { static final String OPTION_THIN_PACK = "thin-pack"; + static final String OPTION_SIDE_BAND = "side-band"; + static final String OPTION_SIDE_BAND_64K = "side-band-64k"; static final String OPTION_OFS_DELTA = "ofs-delta"; @@ -119,6 +121,8 @@ abstract class PackFetchConnection extends FetchConnection { private boolean thinPack; + private boolean sideband; + PackFetchConnection(final PackTransport packTransport) { local = packTransport.local; uri = packTransport.uri; @@ -330,6 +334,10 @@ abstract class PackFetchConnection extends FetchConnection { wantCapability(line, OPTION_OFS_DELTA); multiAck = wantCapability(line, OPTION_MULTI_ACK); thinPack = wantCapability(line, OPTION_THIN_PACK); + if (wantCapability(line, OPTION_SIDE_BAND_64K)) + sideband = true; + else if (wantCapability(line, OPTION_SIDE_BAND)) + sideband = true; return line.toString(); } @@ -520,7 +528,9 @@ abstract class PackFetchConnection extends FetchConnection { } private void receivePack(final ProgressMonitor monitor) throws IOException { - final IndexPack ip = IndexPack.create(local, in); + final IndexPack ip; + + ip = IndexPack.create(local, sideband ? pckIn.sideband(monitor) : in); ip.setFixThin(thinPack); ip.index(monitor); ip.renameAndOpenPack(); diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/PacketLineIn.java b/org.spearce.jgit/src/org/spearce/jgit/transport/PacketLineIn.java index a6b6286f..37301369 100644 --- a/org.spearce.jgit/src/org/spearce/jgit/transport/PacketLineIn.java +++ b/org.spearce.jgit/src/org/spearce/jgit/transport/PacketLineIn.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.spearce.jgit.errors.PackProtocolException; import org.spearce.jgit.lib.Constants; import org.spearce.jgit.lib.MutableObjectId; +import org.spearce.jgit.lib.ProgressMonitor; import org.spearce.jgit.util.NB; class PacketLineIn { @@ -55,6 +56,10 @@ class PacketLineIn { lenbuffer = new byte[4]; } + InputStream sideband(final ProgressMonitor pm) { + return new SideBandInputStream(this, in, pm); + } + AckNackResult readACK(final MutableObjectId returnedId) throws IOException { final String line = readString(); if (line.length() == 0) @@ -88,7 +93,7 @@ class PacketLineIn { throw new IOException("Protocol error: expected LF"); } - private int readLength() throws IOException { + int readLength() throws IOException { NB.readFully(in, lenbuffer, 0, 4); try { int r = fromhex[lenbuffer[0]] << 4; diff --git a/org.spearce.jgit/src/org/spearce/jgit/transport/SideBandInputStream.java b/org.spearce.jgit/src/org/spearce/jgit/transport/SideBandInputStream.java new file mode 100644 index 00000000..84334327 --- /dev/null +++ b/org.spearce.jgit/src/org/spearce/jgit/transport/SideBandInputStream.java @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2007,2008 Robin Rosenberg + * Copyright (C) 2008 Shawn Pearce + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License, version 2, as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 + */ +package org.spearce.jgit.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.spearce.jgit.errors.PackProtocolException; +import org.spearce.jgit.errors.TransportException; +import org.spearce.jgit.lib.Constants; +import org.spearce.jgit.lib.ProgressMonitor; +import org.spearce.jgit.util.NB; + +/** + * Unmultiplexes the data portion of a side-band channel. + *

+ * Reading from this input stream obtains data from channel 1, which is + * typically the bulk data stream. + *

+ * Channel 2 is transparently unpacked and "scraped" to update a progress + * monitor. The scraping is performed behind the scenes as part of any of the + * read methods offered by this stream. + *

+ * Channel 3 results in an exception being thrown, as the remote side has issued + * an unrecoverable error. + * + * @see PacketLineIn#sideband(ProgressMonitor) + */ +class SideBandInputStream extends InputStream { + private static final int CH_DATA = 1; + + private static final int CH_PROGRESS = 2; + + private static final int CH_ERROR = 3; + + private static Pattern P_UNBOUNDED = Pattern.compile( + ".*?([\\w ]+): (\\d+)(, done)?.*", Pattern.DOTALL); + + private static Pattern P_BOUNDED = Pattern.compile( + ".*?([\\w ]+):.*\\((\\d+)/(\\d+)\\).*", Pattern.DOTALL); + + private final PacketLineIn pckIn; + + private final InputStream in; + + private final ProgressMonitor monitor; + + private String currentTask; + + private int lastCnt; + + private boolean eof; + + private int channel; + + private int available; + + SideBandInputStream(final PacketLineIn aPckIn, final InputStream aIn, + final ProgressMonitor aProgress) { + pckIn = aPckIn; + in = aIn; + monitor = aProgress; + currentTask = ""; + } + + @Override + public int read() throws IOException { + needDataPacket(); + if (eof) + return -1; + available--; + return in.read(); + } + + @Override + public int read(final byte[] b, int off, int len) throws IOException { + int r = 0; + while (len > 0) { + needDataPacket(); + if (eof) + break; + final int n = in.read(b, off, Math.min(len, available)); + if (n < 0) + break; + r += n; + off += n; + len -= n; + available -= n; + } + return eof && r == 0 ? -1 : r; + } + + private void needDataPacket() throws IOException { + if (eof || (channel == CH_DATA && available > 0)) + return; + for (;;) { + available = pckIn.readLength(); + if (available == 0) { + eof = true; + return; + } + + channel = in.read(); + available -= 5; // length header plus channel indicator + if (available == 0) + continue; + + switch (channel) { + case CH_DATA: + return; + case CH_PROGRESS: + progress(readString(available)); + + continue; + case CH_ERROR: + eof = true; + throw new TransportException("remote: " + readString(available)); + default: + throw new PackProtocolException("Invalid channel " + channel); + } + } + } + + private void progress(final String msg) { + Matcher matcher; + + matcher = P_BOUNDED.matcher(msg); + if (matcher.matches()) { + final String taskname = matcher.group(1); + if (!currentTask.equals(taskname)) { + currentTask = taskname; + lastCnt = 0; + final int tot = Integer.parseInt(matcher.group(3)); + monitor.beginTask(currentTask, tot); + } + final int cnt = Integer.parseInt(matcher.group(2)); + monitor.update(cnt - lastCnt); + lastCnt = cnt; + return; + } + + matcher = P_UNBOUNDED.matcher(msg); + if (matcher.matches()) { + final String taskname = matcher.group(1); + if (!currentTask.equals(taskname)) { + currentTask = taskname; + lastCnt = 0; + monitor.beginTask(currentTask, ProgressMonitor.UNKNOWN); + } + final int cnt = Integer.parseInt(matcher.group(2)); + monitor.update(cnt - lastCnt); + lastCnt = cnt; + } + } + + private String readString(final int len) throws IOException { + final byte[] raw = new byte[len]; + NB.readFully(in, raw, 0, len); + return new String(raw, 0, len, Constants.CHARACTER_ENCODING); + } +} -- 2.11.4.GIT