update copyright
[fedora-idea.git] / plugins / cvs / cvs-core / src / com / intellij / cvsSupport2 / javacvsImpl / io / ReadThread.java
blob8961e3f4d6c4ff9ef9cf726721c2f8c5304ad590
1 /*
2 * Copyright 2000-2009 JetBrains s.r.o.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package com.intellij.cvsSupport2.javacvsImpl.io;
18 import com.intellij.openapi.diagnostic.Logger;
19 import com.intellij.openapi.progress.ProcessCanceledException;
20 import com.intellij.util.concurrency.Semaphore;
21 import org.jetbrains.annotations.NonNls;
22 import org.netbeans.lib.cvsclient.ICvsCommandStopper;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.ArrayList;
27 import java.util.Collection;
29 /**
30 * author: lesya
33 public class ReadThread implements Runnable {
35 public final static Collection<ReadThread> READ_THREADS = new ArrayList<ReadThread>();
37 private static final Logger LOG = Logger.getInstance("#com.intellij.cvsSupport2.javacvsImpl.io.ReadThread");
39 private boolean myAtEndOfStream = false;
40 private final ICvsCommandStopper myCvsCommandStopper;
41 private static final int INITIAL_BUFFER_SIZE = 128 * 1024;
42 private final byte[] myBuffer = new byte[INITIAL_BUFFER_SIZE];
43 private final byte[] myReadBuffer = new byte[INITIAL_BUFFER_SIZE];
44 private int myFirstIndex = 0;
45 private int myLastIndex = 0;
46 private IOException myException;
47 private final InputStream myInputStream;
48 private final Semaphore myStarted = new Semaphore();
49 public static final int TIMEOUT = 3000;
50 public static final int END_OF_STREAM = -1;
51 private boolean myIsClosed = false;
52 @NonNls private static final String NAME = "CvsReadThread";
54 public ReadThread(InputStream inputStream, ICvsCommandStopper cvsCommandStopper) {
55 myInputStream = inputStream;
56 myCvsCommandStopper = cvsCommandStopper;
57 READ_THREADS.add(this);
60 public void waitForStart(){
61 myStarted.down();
62 myStarted.waitFor();
65 public String toString() {
66 @NonNls StringBuffer buffer = new StringBuffer();
67 buffer.append(super.toString());
68 buffer.append(", atEnd: ");
69 buffer.append(myAtEndOfStream);
70 buffer.append(", firstIndex: ");
71 buffer.append(myFirstIndex);
72 buffer.append(", lastIndex: ");
73 buffer.append(myLastIndex);
74 buffer.append(", exception: ");
75 buffer.append(myException);
76 buffer.append(", closed: ");
77 buffer.append(myIsClosed);
78 return buffer.toString();
81 public void run() {
82 Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
83 try {
84 LOG.info("Starting CvsReadThread " + this);
85 while (true) {
86 try {
87 waitForRead();
88 if (myAtEndOfStream || (myException != null)) {
89 executionCompleted();
90 return;
92 int result = myInputStream.read(myReadBuffer);
93 if (result > 0) {
94 writeAndNotify(result);
96 else if (result == END_OF_STREAM) {
97 detectEndAndNotify();
98 return;
101 catch (IOException e) {
102 detectExceptionAndNotify(e);
103 return;
105 catch (Throwable t) {
106 detectExceptionAndNotify(new IOException(t.getLocalizedMessage()));
107 return;
111 finally {
112 Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
116 public synchronized int read() throws IOException {
117 int result = waitForAvailableBytes();
118 if (result == END_OF_STREAM) return END_OF_STREAM;
119 return internalRead();
122 public synchronized int read(byte b[], int off, int len) throws IOException {
123 int result = waitForAvailableBytes();
124 if (result == END_OF_STREAM) return END_OF_STREAM;
125 return internalRead(b, off, len);
128 public synchronized long skip(long n) throws IOException {
129 int result = waitForAvailableBytes();
130 if (result == END_OF_STREAM) return END_OF_STREAM;
131 return internalSkip(n);
134 public synchronized int available() throws IOException {
135 if (size() > 0) return size();
136 if (myAtEndOfStream) return END_OF_STREAM;
137 return myInputStream.available();
140 private int waitForAvailableBytes() throws IOException {
141 while (size() == 0 && !myAtEndOfStream) {
142 try {
143 notify();
144 wait(TIMEOUT);
146 catch (InterruptedException e) {
147 throw new IOException(e.getLocalizedMessage());
149 if (size() == 0 && !myAtEndOfStream) {
150 if (myCvsCommandStopper.isAborted()) {
151 throw new ProcessCanceledException();
155 if (myException != null) throw myException;
156 if (myAtEndOfStream && (size() == 0)) {
157 return END_OF_STREAM;
159 return -2;
162 private synchronized void detectExceptionAndNotify(IOException e) {
163 LOG.info(e);
164 myException = e;
165 executionCompleted();
166 notify();
169 private synchronized void detectEndAndNotify() {
170 if (!myAtEndOfStream) {
171 myAtEndOfStream = true;
172 notify();
174 executionCompleted();
177 private synchronized void writeAndNotify(int result) {
178 synchronized (this) {
179 if (size() == 0) {
180 myFirstIndex = 0;
181 myLastIndex = 0;
183 System.arraycopy(myReadBuffer, 0, myBuffer, myLastIndex, result);
184 myLastIndex += result;
186 notify();
189 private synchronized void waitForRead() throws InterruptedException {
190 myStarted.up();
191 if (myAtEndOfStream || (myException != null)) {
192 return;
194 wait();
197 private void executionCompleted() {
198 READ_THREADS.remove(this);
199 LOG.info("Stopping CvsReadThread " + this);
202 private int size() {
203 return myLastIndex - myFirstIndex;
206 public synchronized void close() throws IOException {
207 myIsClosed = true;
208 if (myAtEndOfStream) return;
209 myAtEndOfStream = true;
210 notify();
213 private synchronized int internalRead() {
214 try {
215 return (char)myBuffer[myFirstIndex++];
217 finally {
218 if (myFirstIndex > myLastIndex) {
219 LOG.assertTrue(false);
224 private synchronized int internalRead(byte b[], int off, int len) {
225 int result = Math.min(len, size());
226 System.arraycopy(myBuffer, myFirstIndex, b, off, result);
227 myFirstIndex += result;
228 return result;
231 private long internalSkip(long n) {
232 long result = Math.min(n, size());
233 myFirstIndex += result;
234 return result;