From 673b3984bdc540da99e1c390cf31f455896a1cda Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Wed, 10 Feb 2010 11:49:27 -0800 Subject: [PATCH] Capture non-progress side band #2 messages and put in result Any messages received on side band #2 that aren't scraped as a progress message into our ProgressMonitor are now forwarded to a buffer which is later included into the OperationResult object. Application callers can use this buffer to present the additional messages from the remote peer after the push or fetch operation has concluded. The smart push connections using the native send-pack/receive-pack protocol now request side-band-64k capability if it is available and forward any messages received through that channel onto this message buffer. This makes hook messages available over smart HTTP, or even over SSH. The SSH transport was modified to redirect the remote command's stderr stream into the message buffer, interleaved with any data received over side band #2. Due to buffering between these two different channels in the SSH channel mux itself the order of any writes between the two cannot be ensured, but it tries to stay close. The local fork transport was also modified to redirect the local receive-pack's stderr into the message buffer, rather than going to the invoking JVM's System.err. This gives applications a chance to log the local error messages, rather than needing to redirect their JVM's stderr before startup. To keep things simple, the application has to wait for the entire operation to complete before it can see the messages. This may be a downside if the user is trying to debug a remote hook that is blocking indefinitely, the user would need to abort the connection before they can inspect the message buffer in any sort of UI built on top of JGit. Change-Id: Ibc215f4569e63071da5b7e5c6674ce924ae39e11 Signed-off-by: Shawn O. Pearce --- .../eclipse/jgit/http/test/HookMessageTest.java | 174 +++++++++++++++++++++ .../org/eclipse/jgit/pgm/AbstractFetchCommand.java | 30 +++- .../src/org/eclipse/jgit/pgm/Push.java | 1 + .../org/eclipse/jgit/transport/BaseConnection.java | 37 ++++- .../jgit/transport/BasePackFetchConnection.java | 2 +- .../jgit/transport/BasePackPushConnection.java | 30 +++- .../src/org/eclipse/jgit/transport/Connection.java | 20 +++ .../org/eclipse/jgit/transport/FetchProcess.java | 7 +- .../eclipse/jgit/transport/OperationResult.java | 29 ++++ .../org/eclipse/jgit/transport/PushProcess.java | 25 ++- .../jgit/transport/SideBandInputStream.java | 28 ++-- .../eclipse/jgit/transport/TransportGitSsh.java | 139 +++------------- .../org/eclipse/jgit/transport/TransportLocal.java | 79 ++++++---- .../org/eclipse/jgit/util/io/MessageWriter.java | 115 ++++++++++++++ .../io/StreamCopyThread.java} | 123 +++++++++------ 15 files changed, 600 insertions(+), 239 deletions(-) create mode 100644 org.eclipse.jgit.http.test/tst/org/eclipse/jgit/http/test/HookMessageTest.java create mode 100644 org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java copy org.eclipse.jgit/src/org/eclipse/jgit/{transport/BaseConnection.java => util/io/StreamCopyThread.java} (50%) diff --git a/org.eclipse.jgit.http.test/tst/org/eclipse/jgit/http/test/HookMessageTest.java b/org.eclipse.jgit.http.test/tst/org/eclipse/jgit/http/test/HookMessageTest.java new file mode 100644 index 00000000..224ea05c --- /dev/null +++ b/org.eclipse.jgit.http.test/tst/org/eclipse/jgit/http/test/HookMessageTest.java @@ -0,0 +1,174 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.http.test; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jgit.errors.RepositoryNotFoundException; +import org.eclipse.jgit.http.server.GitServlet; +import org.eclipse.jgit.http.server.resolver.DefaultReceivePackFactory; +import org.eclipse.jgit.http.server.resolver.RepositoryResolver; +import org.eclipse.jgit.http.server.resolver.ServiceNotAuthorizedException; +import org.eclipse.jgit.http.server.resolver.ServiceNotEnabledException; +import org.eclipse.jgit.http.test.util.AccessEvent; +import org.eclipse.jgit.http.test.util.HttpTestCase; +import org.eclipse.jgit.junit.TestRepository; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.NullProgressMonitor; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.RepositoryConfig; +import org.eclipse.jgit.revwalk.RevBlob; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.transport.PreReceiveHook; +import org.eclipse.jgit.transport.PushResult; +import org.eclipse.jgit.transport.ReceiveCommand; +import org.eclipse.jgit.transport.ReceivePack; +import org.eclipse.jgit.transport.RemoteRefUpdate; +import org.eclipse.jgit.transport.Transport; +import org.eclipse.jgit.transport.URIish; + +public class HookMessageTest extends HttpTestCase { + private Repository remoteRepository; + + private URIish remoteURI; + + protected void setUp() throws Exception { + super.setUp(); + + final TestRepository src = createTestRepository(); + final String srcName = src.getRepository().getDirectory().getName(); + + ServletContextHandler app = server.addContext("/git"); + GitServlet gs = new GitServlet(); + gs.setRepositoryResolver(new RepositoryResolver() { + public Repository open(HttpServletRequest req, String name) + throws RepositoryNotFoundException, + ServiceNotEnabledException { + if (!name.equals(srcName)) + throw new RepositoryNotFoundException(name); + + final Repository db = src.getRepository(); + db.incrementOpen(); + return db; + } + }); + gs.setReceivePackFactory(new DefaultReceivePackFactory() { + public ReceivePack create(HttpServletRequest req, Repository db) + throws ServiceNotEnabledException, + ServiceNotAuthorizedException { + ReceivePack recv = super.create(req, db); + recv.setPreReceiveHook(new PreReceiveHook() { + public void onPreReceive(ReceivePack rp, + Collection commands) { + rp.sendMessage("message line 1"); + rp.sendError("no soup for you!"); + rp.sendMessage("come back next year!"); + } + }); + return recv; + } + + }); + app.addServlet(new ServletHolder(gs), "/*"); + + server.setUp(); + + remoteRepository = src.getRepository(); + remoteURI = toURIish(app, srcName); + + RepositoryConfig cfg = remoteRepository.getConfig(); + cfg.setBoolean("http", null, "receivepack", true); + cfg.save(); + } + + public void testPush_CreateBranch() throws Exception { + final TestRepository src = createTestRepository(); + final RevBlob Q_txt = src.blob("new text"); + final RevCommit Q = src.commit().add("Q", Q_txt).create(); + final Repository db = src.getRepository(); + final String dstName = Constants.R_HEADS + "new.branch"; + Transport t; + PushResult result; + + t = Transport.open(db, remoteURI); + try { + final String srcExpr = Q.name(); + final boolean forceUpdate = false; + final String localName = null; + final ObjectId oldId = null; + + RemoteRefUpdate update = new RemoteRefUpdate(src.getRepository(), + srcExpr, dstName, forceUpdate, localName, oldId); + result = t.push(NullProgressMonitor.INSTANCE, Collections + .singleton(update)); + } finally { + t.close(); + } + + assertTrue(remoteRepository.hasObject(Q_txt)); + assertNotNull("has " + dstName, remoteRepository.getRef(dstName)); + assertEquals(Q, remoteRepository.getRef(dstName).getObjectId()); + fsck(remoteRepository, Q); + + List requests = getRequests(); + assertEquals(2, requests.size()); + + AccessEvent service = requests.get(1); + assertEquals("POST", service.getMethod()); + assertEquals(join(remoteURI, "git-receive-pack"), service.getPath()); + assertEquals(200, service.getStatus()); + + assertEquals("message line 1\n" // + + "error: no soup for you!\n" // + + "come back next year!\n", // + result.getMessages()); + } +} diff --git a/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/AbstractFetchCommand.java b/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/AbstractFetchCommand.java index f5e3c504..1e035675 100644 --- a/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/AbstractFetchCommand.java +++ b/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/AbstractFetchCommand.java @@ -1,6 +1,6 @@ /* * Copyright (C) 2008, Charles O'Farrell - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Shawn O. Pearce * and other copyright owners as documented in the project's IP log. @@ -79,6 +79,34 @@ abstract class AbstractFetchCommand extends TextBuiltin { out.format(" %c %-17s %-10s -> %s", type, longType, src, dst); out.println(); } + + showRemoteMessages(r.getMessages()); + } + + static void showRemoteMessages(String pkt) { + while (0 < pkt.length()) { + final int lf = pkt.indexOf('\n'); + final int cr = pkt.indexOf('\r'); + final int s; + if (0 <= lf && 0 <= cr) + s = Math.min(lf, cr); + else if (0 <= lf) + s = lf; + else if (0 <= cr) + s = cr; + else { + System.err.println("remote: " + pkt); + break; + } + + if (pkt.charAt(s) == '\r') + System.err.print("remote: " + pkt.substring(0, s) + "\r"); + else + System.err.println("remote: " + pkt.substring(0, s)); + + pkt = pkt.substring(s + 1); + } + System.err.flush(); } private String longTypeOf(final TrackingRefUpdate u) { diff --git a/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java b/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java index 6248ec29..2c025456 100644 --- a/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java +++ b/org.eclipse.jgit.pgm/src/org/eclipse/jgit/pgm/Push.java @@ -162,6 +162,7 @@ class Push extends TextBuiltin { printRefUpdateResult(uri, result, rru); } + AbstractFetchCommand.showRemoteMessages(result.getMessages()); if (everythingUpToDate) out.println("Everything up-to-date"); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java index 14be1700..1339b869 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java @@ -1,4 +1,5 @@ /* + * Copyright (C) 2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce @@ -45,6 +46,8 @@ package org.eclipse.jgit.transport; +import java.io.StringWriter; +import java.io.Writer; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -58,12 +61,13 @@ import org.eclipse.jgit.lib.Ref; * @see BasePackConnection * @see BaseFetchConnection */ -abstract class BaseConnection implements Connection { - +public abstract class BaseConnection implements Connection { private Map advertisedRefs = Collections.emptyMap(); private boolean startedOperation; + private Writer messageWriter; + public Map getRefsMap() { return advertisedRefs; } @@ -76,6 +80,10 @@ abstract class BaseConnection implements Connection { return advertisedRefs.get(name); } + public String getMessages() { + return messageWriter != null ? messageWriter.toString() : ""; + } + public abstract void close(); /** @@ -106,4 +114,29 @@ abstract class BaseConnection implements Connection { "Only one operation call per connection is supported."); startedOperation = true; } + + /** + * Get the writer that buffers messages from the remote side. + * + * @return writer to store messages from the remote. + */ + protected Writer getMessageWriter() { + if (messageWriter == null) + setMessageWriter(new StringWriter()); + return messageWriter; + } + + /** + * Set the writer that buffers messages from the remote side. + * + * @param writer + * the writer that messages will be delivered to. The writer's + * {@code toString()} method should be overridden to return the + * complete contents. + */ + protected void setMessageWriter(Writer writer) { + if (messageWriter != null) + throw new IllegalStateException("Writer already initialized"); + messageWriter = writer; + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java index dc9c7948..7b90ec19 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java @@ -612,7 +612,7 @@ abstract class BasePackFetchConnection extends BasePackConnection implements InputStream input = in; if (sideband) - input = new SideBandInputStream(input, monitor); + input = new SideBandInputStream(input, monitor, getMessageWriter()); ip = IndexPack.create(local, input); ip.setFixThin(thinPack); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java index 6e8b05da..ba117074 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java @@ -94,6 +94,8 @@ class BasePackPushConnection extends BasePackConnection implements private boolean capableReport; + private boolean capableSideBand; + private boolean capableOfsDelta; private boolean sentCommand; @@ -145,8 +147,21 @@ class BasePackPushConnection extends BasePackConnection implements writeCommands(refUpdates.values(), monitor); if (writePack) writePack(refUpdates, monitor); - if (sentCommand && capableReport) - readStatusReport(refUpdates); + if (sentCommand) { + if (capableReport) + readStatusReport(refUpdates); + if (capableSideBand) { + // Ensure the data channel is at EOF, so we know we have + // read all side-band data from all channels and have a + // complete copy of the messages (if any) buffered from + // the other data channels. + // + int b = in.read(); + if (0 <= b) + throw new TransportException(uri, "expected EOF;" + + " received '" + (char) b + "' instead"); + } + } } catch (TransportException e) { throw e; } catch (Exception e) { @@ -158,7 +173,7 @@ class BasePackPushConnection extends BasePackConnection implements private void writeCommands(final Collection refUpdates, final ProgressMonitor monitor) throws IOException { - final String capabilities = enableCapabilities(); + final String capabilities = enableCapabilities(monitor); for (final RemoteRefUpdate rru : refUpdates) { if (!capableDeleteRefs && rru.isDelete()) { rru.setStatus(Status.REJECTED_NODELETE); @@ -191,11 +206,18 @@ class BasePackPushConnection extends BasePackConnection implements outNeedsEnd = false; } - private String enableCapabilities() { + private String enableCapabilities(final ProgressMonitor monitor) { final StringBuilder line = new StringBuilder(); capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS); capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS); capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA); + + capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K); + if (capableSideBand) { + in = new SideBandInputStream(in, monitor, getMessageWriter()); + pckIn = new PacketLineIn(in); + } + if (line.length() > 0) line.setCharAt(0, '\0'); return line.toString(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java index 3f0c3d8e..e386c26c 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/Connection.java @@ -1,4 +1,5 @@ /* + * Copyright (C) 2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Shawn O. Pearce * and other copyright owners as documented in the project's IP log. @@ -104,7 +105,26 @@ public interface Connection { * must close that network socket, disconnecting the two peers. If the * remote repository is actually local (same system) this method must close * any open file handles used to read the "remote" repository. + *

+ * If additional messages were produced by the remote peer, these should + * still be retained in the connection instance for {@link #getMessages()}. */ public void close(); + /** + * Get the additional messages, if any, returned by the remote process. + *

+ * These messages are most likely informational or error messages, sent by + * the remote peer, to help the end-user correct any problems that may have + * prevented the operation from completing successfully. Application UIs + * should try to show these in an appropriate context. + *

+ * The message buffer is available after {@link #close()} has been called. + * Prior to closing the connection, the message buffer may be empty. + * + * @return the messages returned by the remote, most likely terminated by a + * newline (LF) character. The empty string is returned if the + * remote produced no additional messages. + */ + public String getMessages(); } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java index 65a5b176..b86f86d2 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/FetchProcess.java @@ -146,7 +146,7 @@ class FetchProcess { // Connection was used for object transfer. If we // do another fetch we must open a new connection. // - closeConnection(); + closeConnection(result); } else { includedTags = false; } @@ -170,7 +170,7 @@ class FetchProcess { } } } finally { - closeConnection(); + closeConnection(result); } final RevWalk walk = new RevWalk(transport.local); @@ -210,9 +210,10 @@ class FetchProcess { "peer did not supply a complete object graph"); } - private void closeConnection() { + private void closeConnection(final FetchResult result) { if (conn != null) { conn.close(); + result.addMessages(conn.getMessages()); conn = null; } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java index e93f7f76..115cfbcc 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/OperationResult.java @@ -1,4 +1,5 @@ /* + * Copyright (C) 2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2007-2009, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce @@ -65,6 +66,8 @@ public abstract class OperationResult { final SortedMap updates = new TreeMap(); + StringBuilder messageBuffer; + /** * Get the URI this result came from. *

@@ -136,4 +139,30 @@ public abstract class OperationResult { void add(final TrackingRefUpdate u) { updates.put(u.getLocalName(), u); } + + /** + * Get the additional messages, if any, returned by the remote process. + *

+ * These messages are most likely informational or error messages, sent by + * the remote peer, to help the end-user correct any problems that may have + * prevented the operation from completing successfully. Application UIs + * should try to show these in an appropriate context. + * + * @return the messages returned by the remote, most likely terminated by a + * newline (LF) character. The empty string is returned if the + * remote produced no additional messages. + */ + public String getMessages() { + return messageBuffer != null ? messageBuffer.toString() : ""; + } + + void addMessages(final String msg) { + if (msg != null && msg.length() > 0) { + if (messageBuffer == null) + messageBuffer = new StringBuilder(); + messageBuffer.append(msg); + if (!msg.endsWith("\n")) + messageBuffer.append('\n'); + } + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java index 17e1dfc7..03b78342 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PushProcess.java @@ -122,8 +122,12 @@ class PushProcess { PushResult execute(final ProgressMonitor monitor) throws NotSupportedException, TransportException { monitor.beginTask(PROGRESS_OPENING_CONNECTION, ProgressMonitor.UNKNOWN); + + final PushResult res = new PushResult(); connection = transport.openPush(); try { + res.setAdvertisedRefs(transport.getURI(), connection.getRefsMap()); + res.setRemoteUpdates(toPush); monitor.endTask(); final Map preprocessed = prepareRemoteUpdates(); @@ -133,10 +137,16 @@ class PushProcess { connection.push(monitor, preprocessed); } finally { connection.close(); + res.addMessages(connection.getMessages()); } if (!transport.isDryRun()) updateTrackingRefs(); - return prepareOperationResult(); + for (final RemoteRefUpdate rru : toPush.values()) { + final TrackingRefUpdate tru = rru.getTrackingRefUpdate(); + if (tru != null) + res.add(tru); + } + return res; } private Map prepareRemoteUpdates() @@ -226,17 +236,4 @@ class PushProcess { } } } - - private PushResult prepareOperationResult() { - final PushResult result = new PushResult(); - result.setAdvertisedRefs(transport.getURI(), connection.getRefsMap()); - result.setRemoteUpdates(toPush); - - for (final RemoteRefUpdate rru : toPush.values()) { - final TrackingRefUpdate tru = rru.getTrackingRefUpdate(); - if (tru != null) - result.add(tru); - } - return result; - } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java index 0abbe7e0..796cb745 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandInputStream.java @@ -48,6 +48,7 @@ import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE; import java.io.IOException; import java.io.InputStream; +import java.io.Writer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -83,10 +84,10 @@ class SideBandInputStream extends InputStream { static final int CH_ERROR = 3; private static Pattern P_UNBOUNDED = Pattern - .compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *$"); + .compile("^([\\w ]+): +(\\d+)(?:, done\\.)? *[\r\n]$"); private static Pattern P_BOUNDED = Pattern - .compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *$"); + .compile("^([\\w ]+): +\\d+% +\\( *(\\d+)/ *(\\d+)\\)(?:, done\\.)? *[\r\n]$"); private final InputStream rawIn; @@ -94,6 +95,8 @@ class SideBandInputStream extends InputStream { private final ProgressMonitor monitor; + private final Writer messages; + private String progressBuffer = ""; private String currentTask; @@ -106,10 +109,12 @@ class SideBandInputStream extends InputStream { private int available; - SideBandInputStream(final InputStream in, final ProgressMonitor progress) { + SideBandInputStream(final InputStream in, final ProgressMonitor progress, + final Writer messageStream) { rawIn = in; pckIn = new PacketLineIn(rawIn); monitor = progress; + messages = messageStream; currentTask = ""; } @@ -170,7 +175,7 @@ class SideBandInputStream extends InputStream { } } - private void progress(String pkt) { + private void progress(String pkt) throws IOException { pkt = progressBuffer + pkt; for (;;) { final int lf = pkt.indexOf('\n'); @@ -185,16 +190,13 @@ class SideBandInputStream extends InputStream { else break; - final String msg = pkt.substring(0, s); - if (doProgressLine(msg)) - pkt = pkt.substring(s + 1); - else - break; + doProgressLine(pkt.substring(0, s + 1)); + pkt = pkt.substring(s + 1); } progressBuffer = pkt; } - private boolean doProgressLine(final String msg) { + private void doProgressLine(final String msg) throws IOException { Matcher matcher; matcher = P_BOUNDED.matcher(msg); @@ -208,7 +210,7 @@ class SideBandInputStream extends InputStream { final int cnt = Integer.parseInt(matcher.group(2)); monitor.update(cnt - lastCnt); lastCnt = cnt; - return true; + return; } matcher = P_UNBOUNDED.matcher(msg); @@ -222,10 +224,10 @@ class SideBandInputStream extends InputStream { final int cnt = Integer.parseInt(matcher.group(2)); monitor.update(cnt - lastCnt); lastCnt = cnt; - return true; + return; } - return false; + messages.write(msg); } private void beginTask(final int totalWorkUnits) { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java index 5ee7887f..c6930424 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitSsh.java @@ -1,5 +1,4 @@ /* - * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce @@ -47,8 +46,6 @@ package org.eclipse.jgit.transport; import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; @@ -57,6 +54,8 @@ import org.eclipse.jgit.errors.NoRemoteRepositoryException; import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.util.QuotedString; +import org.eclipse.jgit.util.io.MessageWriter; +import org.eclipse.jgit.util.io.StreamCopyThread; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.JSchException; @@ -88,8 +87,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport { return false; } - OutputStream errStream; - TransportGitSsh(final Repository local, final URIish uri) { super(local, uri); } @@ -145,15 +142,15 @@ public class TransportGitSsh extends SshTransport implements PackTransport { return cmd.toString(); } - ChannelExec exec(final String exe) throws TransportException { + ChannelExec exec(final String exe, final OutputStream err) + throws TransportException { initSession(); final int tms = getTimeout() > 0 ? getTimeout() * 1000 : 0; try { final ChannelExec channel = (ChannelExec) sock.openChannel("exec"); channel.setCommand(commandFor(exe)); - errStream = createErrorStream(); - channel.setErrStream(errStream, true); + channel.setErrStream(err); channel.connect(tms); return channel; } catch (JSchException je) { @@ -161,9 +158,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport { } } - void checkExecFailure(int status, String exe) throws TransportException { + void checkExecFailure(int status, String exe, String why) + throws TransportException { if (status == 127) { - String why = errStream.toString(); IOException cause = null; if (why != null && why.length() > 0) cause = new IOException(why); @@ -172,41 +169,8 @@ public class TransportGitSsh extends SshTransport implements PackTransport { } } - /** - * @return the error stream for the channel, the stream is used to detect - * specific error reasons for exceptions. - */ - private static OutputStream createErrorStream() { - return new OutputStream() { - private StringBuilder all = new StringBuilder(); - - private StringBuilder sb = new StringBuilder(); - - public String toString() { - String r = all.toString(); - while (r.endsWith("\n")) - r = r.substring(0, r.length() - 1); - return r; - } - - @Override - public void write(final int b) throws IOException { - if (b == '\r') { - return; - } - - sb.append((char) b); - - if (b == '\n') { - all.append(sb); - sb.setLength(0); - } - } - }; - } - - NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf) { - String why = errStream.toString(); + NoRemoteRepositoryException cleanNotFound(NoRemoteRepositoryException nf, + String why) { if (why == null || why.length() == 0) return nf; @@ -235,7 +199,7 @@ public class TransportGitSsh extends SshTransport implements PackTransport { if (getTimeout() <= 0) return out; final PipedInputStream pipeIn = new PipedInputStream(); - final CopyThread copyThread = new CopyThread(pipeIn, out); + final StreamCopyThread copyThread = new StreamCopyThread(pipeIn, out); final PipedOutputStream pipeOut = new PipedOutputStream(pipeIn) { @Override public void flush() throws IOException { @@ -257,65 +221,6 @@ public class TransportGitSsh extends SshTransport implements PackTransport { return pipeOut; } - private static class CopyThread extends Thread { - private final InputStream src; - - private final OutputStream dst; - - private volatile boolean doFlush; - - CopyThread(final InputStream i, final OutputStream o) { - setName(Thread.currentThread().getName() + "-Output"); - src = i; - dst = o; - } - - void flush() { - if (!doFlush) { - doFlush = true; - interrupt(); - } - } - - @Override - public void run() { - try { - final byte[] buf = new byte[1024]; - for (;;) { - try { - if (doFlush) { - doFlush = false; - dst.flush(); - } - - final int n; - try { - n = src.read(buf); - } catch (InterruptedIOException wakey) { - continue; - } - if (n < 0) - break; - dst.write(buf, 0, n); - } catch (IOException e) { - break; - } - } - } finally { - try { - src.close(); - } catch (IOException e) { - // Ignore IO errors on close - } - try { - dst.close(); - } catch (IOException e) { - // Ignore IO errors on close - } - } - } - } - class SshFetchConnection extends BasePackFetchConnection { private ChannelExec channel; @@ -324,12 +229,14 @@ public class TransportGitSsh extends SshTransport implements PackTransport { SshFetchConnection() throws TransportException { super(TransportGitSsh.this); try { - channel = exec(getOptionUploadPack()); + final MessageWriter msg = new MessageWriter(); + setMessageWriter(msg); + channel = exec(getOptionUploadPack(), msg.getRawStream()); if (channel.isConnected()) init(channel.getInputStream(), outputStream(channel)); else - throw new TransportException(uri, errStream.toString()); + throw new TransportException(uri, getMessages()); } catch (TransportException err) { close(); @@ -343,9 +250,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport { try { readAdvertisedRefs(); } catch (NoRemoteRepositoryException notFound) { - close(); - checkExecFailure(exitStatus, getOptionUploadPack()); - throw cleanNotFound(notFound); + final String msgs = getMessages(); + checkExecFailure(exitStatus, getOptionUploadPack(), msgs); + throw cleanNotFound(notFound, msgs); } } @@ -373,12 +280,14 @@ public class TransportGitSsh extends SshTransport implements PackTransport { SshPushConnection() throws TransportException { super(TransportGitSsh.this); try { - channel = exec(getOptionReceivePack()); + final MessageWriter msg = new MessageWriter(); + setMessageWriter(msg); + channel = exec(getOptionReceivePack(), msg.getRawStream()); if (channel.isConnected()) init(channel.getInputStream(), outputStream(channel)); else - throw new TransportException(uri, errStream.toString()); + throw new TransportException(uri, getMessages()); } catch (TransportException err) { close(); @@ -392,9 +301,9 @@ public class TransportGitSsh extends SshTransport implements PackTransport { try { readAdvertisedRefs(); } catch (NoRemoteRepositoryException notFound) { - close(); - checkExecFailure(exitStatus, getOptionReceivePack()); - throw cleanNotFound(notFound); + final String msgs = getMessages(); + checkExecFailure(exitStatus, getOptionReceivePack(), msgs); + throw cleanNotFound(notFound, msgs); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java index a99a9b41..a9bdcd80 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java @@ -1,6 +1,6 @@ /* * Copyright (C) 2007, Dave Watson - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008, Marek Zawirski * Copyright (C) 2008, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce @@ -59,6 +59,8 @@ import org.eclipse.jgit.errors.TransportException; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.util.FS; +import org.eclipse.jgit.util.io.MessageWriter; +import org.eclipse.jgit.util.io.StreamCopyThread; /** * Transport to access a local directory as though it were a remote peer. @@ -129,11 +131,10 @@ class TransportLocal extends Transport implements PackTransport { // Resources must be established per-connection. } - protected Process startProcessWithErrStream(final String cmd) + protected Process spawn(final String cmd) throws TransportException { try { final String[] args; - final Process proc; if (cmd.startsWith("git-")) { args = new String[] { "git", cmd.substring(4), PWD }; @@ -148,9 +149,7 @@ class TransportLocal extends Transport implements PackTransport { } } - proc = Runtime.getRuntime().exec(args, null, remoteGitDir); - new StreamRewritingThread(cmd, proc.getErrorStream()).start(); - return proc; + return Runtime.getRuntime().exec(args, null, remoteGitDir); } catch (IOException err) { throw new TransportException(uri, err.getMessage(), err); } @@ -246,9 +245,20 @@ class TransportLocal extends Transport implements PackTransport { class ForkLocalFetchConnection extends BasePackFetchConnection { private Process uploadPack; + private Thread errorReaderThread; + ForkLocalFetchConnection() throws TransportException { super(TransportLocal.this); - uploadPack = startProcessWithErrStream(getOptionUploadPack()); + + final MessageWriter msg = new MessageWriter(); + setMessageWriter(msg); + + uploadPack = spawn(getOptionUploadPack()); + + final InputStream upErr = uploadPack.getErrorStream(); + errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream()); + errorReaderThread.start(); + final InputStream upIn = uploadPack.getInputStream(); final OutputStream upOut = uploadPack.getOutputStream(); init(upIn, upOut); @@ -268,6 +278,16 @@ class TransportLocal extends Transport implements PackTransport { uploadPack = null; } } + + if (errorReaderThread != null) { + try { + errorReaderThread.join(); + } catch (InterruptedException e) { + // Stop waiting and return anyway. + } finally { + errorReaderThread = null; + } + } } } @@ -351,9 +371,20 @@ class TransportLocal extends Transport implements PackTransport { class ForkLocalPushConnection extends BasePackPushConnection { private Process receivePack; + private Thread errorReaderThread; + ForkLocalPushConnection() throws TransportException { super(TransportLocal.this); - receivePack = startProcessWithErrStream(getOptionReceivePack()); + + final MessageWriter msg = new MessageWriter(); + setMessageWriter(msg); + + receivePack = spawn(getOptionReceivePack()); + + final InputStream rpErr = receivePack.getErrorStream(); + errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream()); + errorReaderThread.start(); + final InputStream rpIn = receivePack.getInputStream(); final OutputStream rpOut = receivePack.getOutputStream(); init(rpIn, rpOut); @@ -373,34 +404,14 @@ class TransportLocal extends Transport implements PackTransport { receivePack = null; } } - } - } - - static class StreamRewritingThread extends Thread { - private final InputStream in; - StreamRewritingThread(final String cmd, final InputStream in) { - super("JGit " + cmd + " Errors"); - this.in = in; - } - - public void run() { - final byte[] tmp = new byte[512]; - try { - for (;;) { - final int n = in.read(tmp); - if (n < 0) - break; - System.err.write(tmp, 0, n); - System.err.flush(); - } - } catch (IOException err) { - // Ignore errors reading errors. - } finally { + if (errorReaderThread != null) { try { - in.close(); - } catch (IOException err2) { - // Ignore errors closing the pipe. + errorReaderThread.join(); + } catch (InterruptedException e) { + // Stop waiting and return anyway. + } finally { + errorReaderThread = null; } } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java new file mode 100644 index 00000000..22c3ce94 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/MessageWriter.java @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2009-2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.util.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.transport.BaseConnection; +import org.eclipse.jgit.util.RawParseUtils; + +/** + * Combines messages from an OutputStream (hopefully in UTF-8) and a Writer. + *

+ * This class is primarily meant for {@link BaseConnection} in contexts where a + * standard error stream from a command execution, as well as messages from a + * side-band channel, need to be combined together into a buffer to represent + * the complete set of messages from a remote repository. + *

+ * Writes made to the writer are re-encoded as UTF-8 and interleaved into the + * buffer that {@link #getRawStream()} also writes to. + *

+ * {@link #toString()} returns all written data, after converting it to a String + * under the assumption of UTF-8 encoding. + *

+ * Internally {@link RawParseUtils#decode(byte[])} is used by {@code toString()} + * tries to work out a reasonably correct character set for the raw data. + */ +public class MessageWriter extends Writer { + private final ByteArrayOutputStream buf; + + private final OutputStreamWriter enc; + + /** Create an empty writer. */ + public MessageWriter() { + buf = new ByteArrayOutputStream(); + enc = new OutputStreamWriter(getRawStream(), Constants.CHARSET); + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + synchronized (buf) { + enc.write(cbuf, off, len); + enc.flush(); + } + } + + /** + * @return the underlying byte stream that character writes to this writer + * drop into. Writes to this stream should should be in UTF-8. + */ + public OutputStream getRawStream() { + return buf; + } + + @Override + public void close() throws IOException { + // Do nothing, we are buffered with no resources. + } + + @Override + public void flush() throws IOException { + // Do nothing, we are buffered with no resources. + } + + /** @return string version of all buffered data. */ + @Override + public String toString() { + return RawParseUtils.decode(buf.toByteArray()); + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java similarity index 50% copy from org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java copy to org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java index 14be1700..a2b95401 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BaseConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java @@ -1,7 +1,5 @@ /* - * Copyright (C) 2008, Marek Zawirski - * Copyright (C) 2008, Robin Rosenberg - * Copyright (C) 2008, Shawn O. Pearce + * Copyright (C) 2009-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,67 +41,88 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package org.eclipse.jgit.transport; +package org.eclipse.jgit.util.io; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; -import org.eclipse.jgit.errors.TransportException; -import org.eclipse.jgit.lib.Ref; +/** Thread to copy from an input stream to an output stream. */ +public class StreamCopyThread extends Thread { + private static final int BUFFER_SIZE = 1024; -/** - * Base helper class for implementing operations connections. - * - * @see BasePackConnection - * @see BaseFetchConnection - */ -abstract class BaseConnection implements Connection { - - private Map advertisedRefs = Collections.emptyMap(); - - private boolean startedOperation; + private final InputStream src; - public Map getRefsMap() { - return advertisedRefs; - } - - public final Collection getRefs() { - return advertisedRefs.values(); - } + private final OutputStream dst; - public final Ref getRef(final String name) { - return advertisedRefs.get(name); - } - - public abstract void close(); + private volatile boolean doFlush; /** - * Denote the list of refs available on the remote repository. - *

- * Implementors should invoke this method once they have obtained the refs - * that are available from the remote repository. + * Create a thread to copy data from an input stream to an output stream. * - * @param all - * the complete list of refs the remote has to offer. This map - * will be wrapped in an unmodifiable way to protect it, but it - * does not get copied. + * @param i + * stream to copy from. The thread terminates when this stream + * reaches EOF. The thread closes this stream before it exits. + * @param o + * stream to copy into. The destination stream is automatically + * closed when the thread terminates. */ - protected void available(final Map all) { - advertisedRefs = Collections.unmodifiableMap(all); + public StreamCopyThread(final InputStream i, final OutputStream o) { + setName(Thread.currentThread().getName() + "-StreamCopy"); + src = i; + dst = o; } /** - * Helper method for ensuring one-operation per connection. Check whether - * operation was already marked as started, and mark it as started. - * - * @throws TransportException - * if operation was already marked as started. + * Request the thread to flush the output stream as soon as possible. + *

+ * This is an asynchronous request to the thread. The actual flush will + * happen at some future point in time, when the thread wakes up to process + * the request. */ - protected void markStartedOperation() throws TransportException { - if (startedOperation) - throw new TransportException( - "Only one operation call per connection is supported."); - startedOperation = true; + public void flush() { + if (!doFlush) { + doFlush = true; + interrupt(); + } + } + + @Override + public void run() { + try { + final byte[] buf = new byte[BUFFER_SIZE]; + for (;;) { + try { + if (doFlush) { + doFlush = false; + dst.flush(); + } + + final int n; + try { + n = src.read(buf); + } catch (InterruptedIOException wakey) { + continue; + } + if (n < 0) + break; + dst.write(buf, 0, n); + } catch (IOException e) { + break; + } + } + } finally { + try { + src.close(); + } catch (IOException e) { + // Ignore IO errors on close + } + try { + dst.close(); + } catch (IOException e) { + // Ignore IO errors on close + } + } } } -- 2.11.4.GIT