Fix infrequent hangs in test-runner. (#16793)
[mono-project.git] / mcs / class / referencesource / System / services / monitoring / system / diagnosticts / AsyncStreamReader.cs
blob8260061e8d865cc27b2ce92b9c55cfd29e24b9ea
1 // ==++==
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // ==--==
6 /*============================================================
7 **
8 ** Class: AsyncStreamReader
9 **
10 ** Purpose: For reading text from streams using a particular
11 ** encoding in an asychronous manner used by the process class
14 ===========================================================*/
17 namespace System.Diagnostics {
18 using System;
19 using System.IO;
20 using System.Text;
21 using System.Runtime.InteropServices;
22 using System.Threading;
23 using System.Collections;
24 using Microsoft.Win32.SafeHandles;
26 internal delegate void UserCallBack(String data);
28 internal class AsyncStreamReader : IDisposable
30 internal const int DefaultBufferSize = 1024; // Byte buffer size
31 private const int MinBufferSize = 128;
33 private Stream stream;
34 private Encoding encoding;
35 private Decoder decoder;
36 private byte[] byteBuffer;
37 private char[] charBuffer;
38 // Record the number of valid bytes in the byteBuffer, for a few checks.
40 // This is the maximum number of chars we can get from one call to
41 // ReadBuffer. Used so ReadBuffer can tell when to copy data into
42 // a user's char[] directly, instead of our internal char[].
43 private int _maxCharsPerBuffer;
45 #pragma warning disable 414
46 // Store a backpointer to the process class, to check for user callbacks
47 private Process process;
48 #pragma warning restore
50 // Delegate to call user function.
51 private UserCallBack userCallBack;
53 // Internal Cancel operation
54 private bool cancelOperation;
55 private ManualResetEvent eofEvent;
56 private Queue messageQueue;
57 private StringBuilder sb;
58 private bool bLastCarriageReturn;
60 // Cache the last position scanned in sb when searching for lines.
61 private int currentLinePos;
62 #if MONO
63 //users to coordinate between Dispose and ReadBuffer
64 private object syncObject = new Object ();
65 private IAsyncResult asyncReadResult = null;
66 #endif
68 internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding)
69 : this(process, stream, callback, encoding, DefaultBufferSize) {
73 // Creates a new AsyncStreamReader for the given stream. The
74 // character encoding is set by encoding and the buffer size,
75 // in number of 16-bit characters, is set by bufferSize.
76 //
77 internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize)
79 Debug.Assert (process != null && stream !=null && encoding !=null && callback != null, "Invalid arguments!");
80 Debug.Assert(stream.CanRead, "Stream must be readable!");
81 Debug.Assert(bufferSize > 0, "Invalid buffer size!");
83 Init(process, stream, callback, encoding, bufferSize);
84 messageQueue = new Queue();
87 private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize) {
88 this.process = process;
89 this.stream = stream;
90 this.encoding = encoding;
91 this.userCallBack = callback;
92 decoder = encoding.GetDecoder();
93 if (bufferSize < MinBufferSize) bufferSize = MinBufferSize;
94 byteBuffer = new byte[bufferSize];
95 _maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
96 charBuffer = new char[_maxCharsPerBuffer];
97 cancelOperation = false;
98 eofEvent = new ManualResetEvent(false);
99 sb = null;
100 this.bLastCarriageReturn = false;
103 public virtual void Close()
105 Dispose(true);
108 void IDisposable.Dispose() {
109 Dispose(true);
110 GC.SuppressFinalize(this);
113 protected virtual void Dispose(bool disposing)
115 #if MONO
116 lock (syncObject) {
117 #endif
118 if (disposing) {
119 if (stream != null) {
120 #if MONO
121 if (asyncReadResult != null && !asyncReadResult.IsCompleted) {
122 // Closing underlying stream when having pending async read request in progress is
123 // not portable and racy by design. Try to cancel pending async read before closing stream.
124 // We are still holding lock that will prevent new async read requests to queue up
125 // before we have closed and invalidated the stream.
126 if (stream is FileStream) {
127 SafeHandle tmpStreamHandle = ((FileStream)stream).SafeFileHandle;
128 while (!asyncReadResult.IsCompleted) {
129 MonoIOError error;
130 if (!MonoIO.Cancel (tmpStreamHandle, out error) && error == MonoIOError.ERROR_NOT_SUPPORTED) {
131 // Platform don't support canceling pending IO requests on stream. If an async pending read
132 // is still in flight when closing stream, it could trigger a race condition.
133 break;
136 // Wait for a short time for pending async read to cancel/complete/fail.
137 asyncReadResult.AsyncWaitHandle.WaitOne (200);
141 #endif
142 stream.Close();
145 if (stream != null) {
146 stream = null;
147 encoding = null;
148 decoder = null;
149 byteBuffer = null;
150 charBuffer = null;
153 if( eofEvent != null) {
154 eofEvent.Close();
155 eofEvent = null;
157 #if MONO
159 #endif
162 public virtual Encoding CurrentEncoding {
163 get { return encoding; }
166 public virtual Stream BaseStream {
167 get { return stream; }
170 // User calls BeginRead to start the asynchronous read
171 internal void BeginReadLine() {
172 if( cancelOperation) {
173 cancelOperation = false;
176 if( sb == null ) {
177 sb = new StringBuilder(DefaultBufferSize);
178 #if MONO
179 asyncReadResult = stream.BeginRead (byteBuffer, 0 , byteBuffer.Length, new AsyncCallback (ReadBuffer), null);
180 #else
181 stream.BeginRead(byteBuffer, 0 , byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
182 #endif
184 else {
185 FlushMessageQueue();
189 internal void CancelOperation() {
190 cancelOperation = true;
193 // This is the async callback function. Only one thread could/should call this.
194 private void ReadBuffer(IAsyncResult ar) {
196 int byteLen;
198 try {
199 #if MONO
200 lock (syncObject) {
201 Debug.Assert (ar.IsCompleted);
202 asyncReadResult = null;
203 if (this.stream == null)
204 byteLen = 0;
205 else
206 byteLen = stream.EndRead (ar);
208 #else
209 byteLen = stream.EndRead(ar);
210 #endif
212 catch (IOException ) {
213 // We should ideally consume errors from operations getting cancelled
214 // so that we don't crash the unsuspecting parent with an unhandled exc.
215 // This seems to come in 2 forms of exceptions (depending on platform and scenario),
216 // namely OperationCanceledException and IOException (for errorcode that we don't
217 // map explicitly).
218 byteLen = 0; // Treat this as EOF
220 catch (OperationCanceledException ) {
221 // We should consume any OperationCanceledException from child read here
222 // so that we don't crash the parent with an unhandled exc
223 byteLen = 0; // Treat this as EOF
226 #if MONO
227 retry_dispose:
228 #endif
229 if (byteLen == 0) {
230 // We're at EOF, we won't call this function again from here on.
231 lock(messageQueue) {
232 if( sb.Length != 0) {
233 messageQueue.Enqueue(sb.ToString());
234 sb.Length = 0;
236 messageQueue.Enqueue(null);
239 try {
240 // UserCallback could throw, we should still set the eofEvent
241 FlushMessageQueue();
243 finally {
244 #if MONO
245 lock (syncObject) {
246 if (eofEvent != null) {
247 try {
248 eofEvent.Set ();
249 } catch (System.ObjectDisposedException) {
250 // This races with Dispose, it's safe to ignore the error as it comes from a SafeHandle doing its job
254 #else
255 eofEvent.Set();
256 #endif
258 } else {
259 #if MONO
260 lock (syncObject) {
261 if (decoder == null) { //we got disposed after the EndRead, retry as Diposed
262 byteLen = 0;
263 goto retry_dispose;
265 #endif
266 int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
267 sb.Append(charBuffer, 0, charLen);
268 #if MONO
270 #endif
271 GetLinesFromStringBuilder();
272 #if MONO
273 lock (syncObject) {
274 if (stream == null) { //we got disposed after the EndRead, retry as Diposed
275 byteLen = 0;
276 goto retry_dispose;
278 asyncReadResult = stream.BeginRead (byteBuffer, 0 , byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
280 #else
281 stream.BeginRead(byteBuffer, 0 , byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
282 #endif
287 // Read lines stored in StringBuilder and the buffer we just read into.
288 // A line is defined as a sequence of characters followed by
289 // a carriage return ('\r'), a line feed ('\n'), or a carriage return
290 // immediately followed by a line feed. The resulting string does not
291 // contain the terminating carriage return and/or line feed. The returned
292 // value is null if the end of the input stream has been reached.
295 private void GetLinesFromStringBuilder() {
296 int currentIndex = currentLinePos;
297 int lineStart = 0;
298 int len = sb.Length;
300 // skip a beginning '\n' character of new block if last block ended
301 // with '\r'
302 if (bLastCarriageReturn && (len > 0) && sb[0] == '\n')
304 currentIndex = 1;
305 lineStart = 1;
306 bLastCarriageReturn = false;
309 while (currentIndex < len) {
310 char ch = sb[currentIndex];
311 // Note the following common line feed chars:
312 // \n - UNIX \r\n - DOS \r - Mac
313 if (ch == '\r' || ch == '\n') {
314 string s = sb.ToString(lineStart, currentIndex - lineStart);
315 lineStart = currentIndex + 1;
316 // skip the "\n" character following "\r" character
317 if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
319 lineStart++;
320 currentIndex++;
323 lock(messageQueue) {
324 messageQueue.Enqueue(s);
327 currentIndex++;
329 if (sb[len - 1] == '\r') {
330 bLastCarriageReturn = true;
332 // Keep the rest characaters which can't form a new line in string builder.
333 if (lineStart < len) {
334 if (lineStart == 0) {
335 // we found no breaklines, in this case we cache the position
336 // so next time we don't have to restart from the beginning
337 currentLinePos = currentIndex;
339 else {
340 sb.Remove(0, lineStart);
341 currentLinePos = 0;
344 else {
345 sb.Length = 0;
346 currentLinePos = 0;
349 FlushMessageQueue();
352 private void FlushMessageQueue() {
353 while(true) {
355 // When we call BeginReadLine, we also need to flush the queue
356 // So there could be a ---- between the ReadBuffer and BeginReadLine
357 // We need to take lock before DeQueue.
358 if( messageQueue.Count > 0) {
359 lock(messageQueue) {
360 if( messageQueue.Count > 0) {
361 string s = (string)messageQueue.Dequeue();
362 // skip if the read is the read is cancelled
363 // this might happen inside UserCallBack
364 // However, continue to drain the queue
365 if (!cancelOperation)
367 userCallBack(s);
372 else {
373 break;
378 // Wait until we hit EOF. This is called from Process.WaitForExit
379 // We will lose some information if we don't do this.
380 internal void WaitUtilEOF() {
381 if( eofEvent != null) {
382 eofEvent.WaitOne();
383 eofEvent.Close();
384 eofEvent = null;