Reduce multi-level buffered streams in transport code
[jgit/MarioXXX.git] / org.eclipse.jgit / src / org / eclipse / jgit / transport / BasePackPushConnection.java
blobe10cefd3abe6d4870c6f4d6b8162f48a67bf5136
1 /*
2 * Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
3 * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
4 * and other copyright owners as documented in the project's IP log.
6 * This program and the accompanying materials are made available
7 * under the terms of the Eclipse Distribution License v1.0 which
8 * accompanies this distribution, is reproduced below, and is
9 * available at http://www.eclipse.org/org/documents/edl-v10.php
11 * All rights reserved.
13 * Redistribution and use in source and binary forms, with or
14 * without modification, are permitted provided that the following
15 * conditions are met:
17 * - Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials provided
23 * with the distribution.
25 * - Neither the name of the Eclipse Foundation, Inc. nor the
26 * names of its contributors may be used to endorse or promote
27 * products derived from this software without specific prior
28 * written permission.
30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
31 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
32 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
33 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
35 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
36 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
37 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
38 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
39 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
40 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
41 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
42 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 package org.eclipse.jgit.transport;
47 import java.io.IOException;
48 import java.util.ArrayList;
49 import java.util.Collection;
50 import java.util.Map;
52 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
53 import org.eclipse.jgit.errors.NotSupportedException;
54 import org.eclipse.jgit.errors.PackProtocolException;
55 import org.eclipse.jgit.errors.TransportException;
56 import org.eclipse.jgit.lib.ObjectId;
57 import org.eclipse.jgit.lib.PackWriter;
58 import org.eclipse.jgit.lib.ProgressMonitor;
59 import org.eclipse.jgit.lib.Ref;
60 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
62 /**
63 * Push implementation using the native Git pack transfer service.
64 * <p>
65 * This is the canonical implementation for transferring objects to the remote
66 * repository from the local repository by talking to the 'git-receive-pack'
67 * service. Objects are packed on the local side into a pack file and then sent
68 * to the remote repository.
69 * <p>
70 * This connection requires only a bi-directional pipe or socket, and thus is
71 * easily wrapped up into a local process pipe, anonymous TCP socket, or a
72 * command executed through an SSH tunnel.
73 * <p>
74 * This implementation honors {@link Transport#isPushThin()} option.
75 * <p>
76 * Concrete implementations should just call
77 * {@link #init(java.io.InputStream, java.io.OutputStream)} and
78 * {@link #readAdvertisedRefs()} methods in constructor or before any use. They
79 * should also handle resources releasing in {@link #close()} method if needed.
81 class BasePackPushConnection extends BasePackConnection implements
82 PushConnection {
83 static final String CAPABILITY_REPORT_STATUS = "report-status";
85 static final String CAPABILITY_DELETE_REFS = "delete-refs";
87 static final String CAPABILITY_OFS_DELTA = "ofs-delta";
89 static final String CAPABILITY_SIDE_BAND_64K = "side-band-64k";
91 private final boolean thinPack;
93 private boolean capableDeleteRefs;
95 private boolean capableReport;
97 private boolean capableSideBand;
99 private boolean capableOfsDelta;
101 private boolean sentCommand;
103 private boolean writePack;
105 /** Time in milliseconds spent transferring the pack data. */
106 private long packTransferTime;
108 BasePackPushConnection(final PackTransport packTransport) {
109 super(packTransport);
110 thinPack = transport.isPushThin();
113 public void push(final ProgressMonitor monitor,
114 final Map<String, RemoteRefUpdate> refUpdates)
115 throws TransportException {
116 markStartedOperation();
117 doPush(monitor, refUpdates);
120 @Override
121 protected TransportException noRepository() {
122 // Sadly we cannot tell the "invalid URI" case from "push not allowed".
123 // Opening a fetch connection can help us tell the difference, as any
124 // useful repository is going to support fetch if it also would allow
125 // push. So if fetch throws NoRemoteRepositoryException we know the
126 // URI is wrong. Otherwise we can correctly state push isn't allowed
127 // as the fetch connection opened successfully.
129 try {
130 transport.openFetch().close();
131 } catch (NotSupportedException e) {
132 // Fall through.
133 } catch (NoRemoteRepositoryException e) {
134 // Fetch concluded the repository doesn't exist.
136 return e;
137 } catch (TransportException e) {
138 // Fall through.
140 return new TransportException(uri, "push not permitted");
143 protected void doPush(final ProgressMonitor monitor,
144 final Map<String, RemoteRefUpdate> refUpdates)
145 throws TransportException {
146 try {
147 writeCommands(refUpdates.values(), monitor);
148 if (writePack)
149 writePack(refUpdates, monitor);
150 if (sentCommand) {
151 if (capableReport)
152 readStatusReport(refUpdates);
153 if (capableSideBand) {
154 // Ensure the data channel is at EOF, so we know we have
155 // read all side-band data from all channels and have a
156 // complete copy of the messages (if any) buffered from
157 // the other data channels.
159 int b = in.read();
160 if (0 <= b)
161 throw new TransportException(uri, "expected EOF;"
162 + " received '" + (char) b + "' instead");
165 } catch (TransportException e) {
166 throw e;
167 } catch (Exception e) {
168 throw new TransportException(uri, e.getMessage(), e);
169 } finally {
170 close();
174 private void writeCommands(final Collection<RemoteRefUpdate> refUpdates,
175 final ProgressMonitor monitor) throws IOException {
176 final String capabilities = enableCapabilities(monitor);
177 for (final RemoteRefUpdate rru : refUpdates) {
178 if (!capableDeleteRefs && rru.isDelete()) {
179 rru.setStatus(Status.REJECTED_NODELETE);
180 continue;
183 final StringBuilder sb = new StringBuilder();
184 final Ref advertisedRef = getRef(rru.getRemoteName());
185 final ObjectId oldId = (advertisedRef == null ? ObjectId.zeroId()
186 : advertisedRef.getObjectId());
187 sb.append(oldId.name());
188 sb.append(' ');
189 sb.append(rru.getNewObjectId().name());
190 sb.append(' ');
191 sb.append(rru.getRemoteName());
192 if (!sentCommand) {
193 sentCommand = true;
194 sb.append(capabilities);
197 pckOut.writeString(sb.toString());
198 rru.setStatus(Status.AWAITING_REPORT);
199 if (!rru.isDelete())
200 writePack = true;
203 if (monitor.isCancelled())
204 throw new TransportException(uri, "push cancelled");
205 pckOut.end();
206 outNeedsEnd = false;
209 private String enableCapabilities(final ProgressMonitor monitor) {
210 final StringBuilder line = new StringBuilder();
211 capableReport = wantCapability(line, CAPABILITY_REPORT_STATUS);
212 capableDeleteRefs = wantCapability(line, CAPABILITY_DELETE_REFS);
213 capableOfsDelta = wantCapability(line, CAPABILITY_OFS_DELTA);
215 capableSideBand = wantCapability(line, CAPABILITY_SIDE_BAND_64K);
216 if (capableSideBand) {
217 in = new SideBandInputStream(in, monitor, getMessageWriter());
218 pckIn = new PacketLineIn(in);
221 if (line.length() > 0)
222 line.setCharAt(0, '\0');
223 return line.toString();
226 private void writePack(final Map<String, RemoteRefUpdate> refUpdates,
227 final ProgressMonitor monitor) throws IOException {
228 final PackWriter writer = new PackWriter(local, monitor);
229 final ArrayList<ObjectId> remoteObjects = new ArrayList<ObjectId>(
230 getRefs().size());
231 final ArrayList<ObjectId> newObjects = new ArrayList<ObjectId>(
232 refUpdates.size());
234 for (final Ref r : getRefs())
235 remoteObjects.add(r.getObjectId());
236 remoteObjects.addAll(additionalHaves);
237 for (final RemoteRefUpdate r : refUpdates.values()) {
238 if (!ObjectId.zeroId().equals(r.getNewObjectId()))
239 newObjects.add(r.getNewObjectId());
242 writer.setThin(thinPack);
243 writer.setDeltaBaseAsOffset(capableOfsDelta);
244 writer.preparePack(newObjects, remoteObjects);
245 final long start = System.currentTimeMillis();
246 writer.writePack(out);
247 out.flush();
248 packTransferTime = System.currentTimeMillis() - start;
251 private void readStatusReport(final Map<String, RemoteRefUpdate> refUpdates)
252 throws IOException {
253 final String unpackLine = readStringLongTimeout();
254 if (!unpackLine.startsWith("unpack "))
255 throw new PackProtocolException(uri, "unexpected report line: "
256 + unpackLine);
257 final String unpackStatus = unpackLine.substring("unpack ".length());
258 if (!unpackStatus.equals("ok"))
259 throw new TransportException(uri,
260 "error occurred during unpacking on the remote end: "
261 + unpackStatus);
263 String refLine;
264 while ((refLine = pckIn.readString()) != PacketLineIn.END) {
265 boolean ok = false;
266 int refNameEnd = -1;
267 if (refLine.startsWith("ok ")) {
268 ok = true;
269 refNameEnd = refLine.length();
270 } else if (refLine.startsWith("ng ")) {
271 ok = false;
272 refNameEnd = refLine.indexOf(" ", 3);
274 if (refNameEnd == -1)
275 throw new PackProtocolException(uri
276 + ": unexpected report line: " + refLine);
277 final String refName = refLine.substring(3, refNameEnd);
278 final String message = (ok ? null : refLine
279 .substring(refNameEnd + 1));
281 final RemoteRefUpdate rru = refUpdates.get(refName);
282 if (rru == null)
283 throw new PackProtocolException(uri
284 + ": unexpected ref report: " + refName);
285 if (ok) {
286 rru.setStatus(Status.OK);
287 } else {
288 rru.setStatus(Status.REJECTED_OTHER_REASON);
289 rru.setMessage(message);
292 for (final RemoteRefUpdate rru : refUpdates.values()) {
293 if (rru.getStatus() == Status.AWAITING_REPORT)
294 throw new PackProtocolException(uri
295 + ": expected report for ref " + rru.getRemoteName()
296 + " not received");
300 private String readStringLongTimeout() throws IOException {
301 if (timeoutIn == null)
302 return pckIn.readString();
304 // The remote side may need a lot of time to choke down the pack
305 // we just sent them. There may be many deltas that need to be
306 // resolved by the remote. Its hard to say how long the other
307 // end is going to be silent. Taking 10x the configured timeout
308 // or the time spent transferring the pack, whichever is larger,
309 // gives the other side some reasonable window to process the data,
310 // but this is just a wild guess.
312 final int oldTimeout = timeoutIn.getTimeout();
313 final int sendTime = (int) Math.min(packTransferTime, 28800000L);
314 try {
315 timeoutIn.setTimeout(10 * Math.max(sendTime, oldTimeout));
316 return pckIn.readString();
317 } finally {
318 timeoutIn.setTimeout(oldTimeout);