2 * Copyright 2000-2009 JetBrains s.r.o.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package com
.intellij
.cvsSupport2
.javacvsImpl
.io
;
18 import com
.intellij
.openapi
.diagnostic
.Logger
;
19 import com
.intellij
.openapi
.progress
.ProcessCanceledException
;
20 import com
.intellij
.util
.concurrency
.Semaphore
;
21 import org
.jetbrains
.annotations
.NonNls
;
22 import org
.netbeans
.lib
.cvsclient
.ICvsCommandStopper
;
24 import java
.io
.IOException
;
25 import java
.io
.InputStream
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collection
;
33 public class ReadThread
implements Runnable
{
35 public final static Collection
<ReadThread
> READ_THREADS
= new ArrayList
<ReadThread
>();
37 private static final Logger LOG
= Logger
.getInstance("#com.intellij.cvsSupport2.javacvsImpl.io.ReadThread");
39 private boolean myAtEndOfStream
= false;
40 private final ICvsCommandStopper myCvsCommandStopper
;
41 private static final int INITIAL_BUFFER_SIZE
= 128 * 1024;
42 private final byte[] myBuffer
= new byte[INITIAL_BUFFER_SIZE
];
43 private final byte[] myReadBuffer
= new byte[INITIAL_BUFFER_SIZE
];
44 private int myFirstIndex
= 0;
45 private int myLastIndex
= 0;
46 private IOException myException
;
47 private final InputStream myInputStream
;
48 private final Semaphore myStarted
= new Semaphore();
49 public static final int TIMEOUT
= 3000;
50 public static final int END_OF_STREAM
= -1;
51 private boolean myIsClosed
= false;
52 @NonNls private static final String NAME
= "CvsReadThread";
54 public ReadThread(InputStream inputStream
, ICvsCommandStopper cvsCommandStopper
) {
55 myInputStream
= inputStream
;
56 myCvsCommandStopper
= cvsCommandStopper
;
57 READ_THREADS
.add(this);
60 public void waitForStart(){
65 public String
toString() {
66 @NonNls StringBuffer buffer
= new StringBuffer();
67 buffer
.append(super.toString());
68 buffer
.append(", atEnd: ");
69 buffer
.append(myAtEndOfStream
);
70 buffer
.append(", firstIndex: ");
71 buffer
.append(myFirstIndex
);
72 buffer
.append(", lastIndex: ");
73 buffer
.append(myLastIndex
);
74 buffer
.append(", exception: ");
75 buffer
.append(myException
);
76 buffer
.append(", closed: ");
77 buffer
.append(myIsClosed
);
78 return buffer
.toString();
82 Thread
.currentThread().setPriority(Thread
.MAX_PRIORITY
);
84 LOG
.info("Starting CvsReadThread " + this);
88 if (myAtEndOfStream
|| (myException
!= null)) {
92 int result
= myInputStream
.read(myReadBuffer
);
94 writeAndNotify(result
);
96 else if (result
== END_OF_STREAM
) {
101 catch (IOException e
) {
102 detectExceptionAndNotify(e
);
105 catch (Throwable t
) {
106 detectExceptionAndNotify(new IOException(t
.getLocalizedMessage()));
112 Thread
.currentThread().setPriority(Thread
.NORM_PRIORITY
);
116 public synchronized int read() throws IOException
{
117 int result
= waitForAvailableBytes();
118 if (result
== END_OF_STREAM
) return END_OF_STREAM
;
119 return internalRead();
122 public synchronized int read(byte b
[], int off
, int len
) throws IOException
{
123 int result
= waitForAvailableBytes();
124 if (result
== END_OF_STREAM
) return END_OF_STREAM
;
125 return internalRead(b
, off
, len
);
128 public synchronized long skip(long n
) throws IOException
{
129 int result
= waitForAvailableBytes();
130 if (result
== END_OF_STREAM
) return END_OF_STREAM
;
131 return internalSkip(n
);
134 public synchronized int available() throws IOException
{
135 if (size() > 0) return size();
136 if (myAtEndOfStream
) return END_OF_STREAM
;
137 return myInputStream
.available();
140 private int waitForAvailableBytes() throws IOException
{
141 while (size() == 0 && !myAtEndOfStream
) {
146 catch (InterruptedException e
) {
147 throw new IOException(e
.getLocalizedMessage());
149 if (size() == 0 && !myAtEndOfStream
) {
150 if (myCvsCommandStopper
.isAborted()) {
151 throw new ProcessCanceledException();
155 if (myException
!= null) throw myException
;
156 if (myAtEndOfStream
&& (size() == 0)) {
157 return END_OF_STREAM
;
162 private synchronized void detectExceptionAndNotify(IOException e
) {
165 executionCompleted();
169 private synchronized void detectEndAndNotify() {
170 if (!myAtEndOfStream
) {
171 myAtEndOfStream
= true;
174 executionCompleted();
177 private synchronized void writeAndNotify(int result
) {
178 synchronized (this) {
183 System
.arraycopy(myReadBuffer
, 0, myBuffer
, myLastIndex
, result
);
184 myLastIndex
+= result
;
189 private synchronized void waitForRead() throws InterruptedException
{
191 if (myAtEndOfStream
|| (myException
!= null)) {
197 private void executionCompleted() {
198 READ_THREADS
.remove(this);
199 LOG
.info("Stopping CvsReadThread " + this);
203 return myLastIndex
- myFirstIndex
;
206 public synchronized void close() throws IOException
{
208 if (myAtEndOfStream
) return;
209 myAtEndOfStream
= true;
213 private synchronized int internalRead() {
215 return (char)myBuffer
[myFirstIndex
++];
218 if (myFirstIndex
> myLastIndex
) {
219 LOG
.assertTrue(false);
224 private synchronized int internalRead(byte b
[], int off
, int len
) {
225 int result
= Math
.min(len
, size());
226 System
.arraycopy(myBuffer
, myFirstIndex
, b
, off
, result
);
227 myFirstIndex
+= result
;
231 private long internalSkip(long n
) {
232 long result
= Math
.min(n
, size());
233 myFirstIndex
+= result
;