3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
8 ** Class: AsyncStreamReader
10 ** Purpose: For reading text from streams using a particular
11 ** encoding in an asychronous manner used by the process class
14 ===========================================================*/
17 namespace System
.Diagnostics
{
21 using System
.Runtime
.InteropServices
;
22 using System
.Threading
;
23 using System
.Collections
;
24 using Microsoft
.Win32
.SafeHandles
;
26 internal delegate void UserCallBack(String data
);
28 internal class AsyncStreamReader
: IDisposable
30 internal const int DefaultBufferSize
= 1024; // Byte buffer size
31 private const int MinBufferSize
= 128;
33 private Stream stream
;
34 private Encoding encoding
;
35 private Decoder decoder
;
36 private byte[] byteBuffer
;
37 private char[] charBuffer
;
38 // Record the number of valid bytes in the byteBuffer, for a few checks.
40 // This is the maximum number of chars we can get from one call to
41 // ReadBuffer. Used so ReadBuffer can tell when to copy data into
42 // a user's char[] directly, instead of our internal char[].
43 private int _maxCharsPerBuffer
;
45 #pragma warning disable 414
46 // Store a backpointer to the process class, to check for user callbacks
47 private Process process
;
48 #pragma warning restore
50 // Delegate to call user function.
51 private UserCallBack userCallBack
;
53 // Internal Cancel operation
54 private bool cancelOperation
;
55 private ManualResetEvent eofEvent
;
56 private Queue messageQueue
;
57 private StringBuilder sb
;
58 private bool bLastCarriageReturn
;
60 // Cache the last position scanned in sb when searching for lines.
61 private int currentLinePos
;
63 //users to coordinate between Dispose and ReadBuffer
64 private object syncObject
= new Object ();
65 private IAsyncResult asyncReadResult
= null;
68 internal AsyncStreamReader(Process process
, Stream stream
, UserCallBack callback
, Encoding encoding
)
69 : this(process
, stream
, callback
, encoding
, DefaultBufferSize
) {
73 // Creates a new AsyncStreamReader for the given stream. The
74 // character encoding is set by encoding and the buffer size,
75 // in number of 16-bit characters, is set by bufferSize.
77 internal AsyncStreamReader(Process process
, Stream stream
, UserCallBack callback
, Encoding encoding
, int bufferSize
)
79 Debug
.Assert (process
!= null && stream
!=null && encoding
!=null && callback
!= null, "Invalid arguments!");
80 Debug
.Assert(stream
.CanRead
, "Stream must be readable!");
81 Debug
.Assert(bufferSize
> 0, "Invalid buffer size!");
83 Init(process
, stream
, callback
, encoding
, bufferSize
);
84 messageQueue
= new Queue();
87 private void Init(Process process
, Stream stream
, UserCallBack callback
, Encoding encoding
, int bufferSize
) {
88 this.process
= process
;
90 this.encoding
= encoding
;
91 this.userCallBack
= callback
;
92 decoder
= encoding
.GetDecoder();
93 if (bufferSize
< MinBufferSize
) bufferSize
= MinBufferSize
;
94 byteBuffer
= new byte[bufferSize
];
95 _maxCharsPerBuffer
= encoding
.GetMaxCharCount(bufferSize
);
96 charBuffer
= new char[_maxCharsPerBuffer
];
97 cancelOperation
= false;
98 eofEvent
= new ManualResetEvent(false);
100 this.bLastCarriageReturn
= false;
103 public virtual void Close()
108 void IDisposable
.Dispose() {
110 GC
.SuppressFinalize(this);
113 protected virtual void Dispose(bool disposing
)
119 if (stream
!= null) {
121 if (asyncReadResult
!= null && !asyncReadResult
.IsCompleted
) {
122 // Closing underlying stream when having pending async read request in progress is
123 // not portable and racy by design. Try to cancel pending async read before closing stream.
124 // We are still holding lock that will prevent new async read requests to queue up
125 // before we have closed and invalidated the stream.
126 if (stream
is FileStream
) {
127 SafeHandle tmpStreamHandle
= ((FileStream
)stream
).SafeFileHandle
;
128 while (!asyncReadResult
.IsCompleted
) {
130 if (!MonoIO
.Cancel (tmpStreamHandle
, out error
) && error
== MonoIOError
.ERROR_NOT_SUPPORTED
) {
131 // Platform don't support canceling pending IO requests on stream. If an async pending read
132 // is still in flight when closing stream, it could trigger a race condition.
136 // Wait for a short time for pending async read to cancel/complete/fail.
137 asyncReadResult
.AsyncWaitHandle
.WaitOne (200);
145 if (stream
!= null) {
153 if( eofEvent
!= null) {
162 public virtual Encoding CurrentEncoding
{
163 get { return encoding; }
166 public virtual Stream BaseStream
{
167 get { return stream; }
170 // User calls BeginRead to start the asynchronous read
171 internal void BeginReadLine() {
172 if( cancelOperation
) {
173 cancelOperation
= false;
177 sb
= new StringBuilder(DefaultBufferSize
);
179 asyncReadResult
= stream
.BeginRead (byteBuffer
, 0 , byteBuffer
.Length
, new AsyncCallback (ReadBuffer
), null);
181 stream
.BeginRead(byteBuffer
, 0 , byteBuffer
.Length
, new AsyncCallback(ReadBuffer
), null);
189 internal void CancelOperation() {
190 cancelOperation
= true;
193 // This is the async callback function. Only one thread could/should call this.
194 private void ReadBuffer(IAsyncResult ar
) {
201 Debug
.Assert (ar
.IsCompleted
);
202 asyncReadResult
= null;
203 if (this.stream
== null)
206 byteLen
= stream
.EndRead (ar
);
209 byteLen
= stream
.EndRead(ar
);
212 catch (IOException
) {
213 // We should ideally consume errors from operations getting cancelled
214 // so that we don't crash the unsuspecting parent with an unhandled exc.
215 // This seems to come in 2 forms of exceptions (depending on platform and scenario),
216 // namely OperationCanceledException and IOException (for errorcode that we don't
218 byteLen
= 0; // Treat this as EOF
220 catch (OperationCanceledException
) {
221 // We should consume any OperationCanceledException from child read here
222 // so that we don't crash the parent with an unhandled exc
223 byteLen
= 0; // Treat this as EOF
230 // We're at EOF, we won't call this function again from here on.
232 if( sb
.Length
!= 0) {
233 messageQueue
.Enqueue(sb
.ToString());
236 messageQueue
.Enqueue(null);
240 // UserCallback could throw, we should still set the eofEvent
246 if (eofEvent
!= null) {
249 } catch (System
.ObjectDisposedException
) {
250 // This races with Dispose, it's safe to ignore the error as it comes from a SafeHandle doing its job
261 if (decoder
== null) { //we got disposed after the EndRead, retry as Diposed
266 int charLen
= decoder
.GetChars(byteBuffer
, 0, byteLen
, charBuffer
, 0);
267 sb
.Append(charBuffer
, 0, charLen
);
271 GetLinesFromStringBuilder();
274 if (stream
== null) { //we got disposed after the EndRead, retry as Diposed
278 asyncReadResult
= stream
.BeginRead (byteBuffer
, 0 , byteBuffer
.Length
, new AsyncCallback(ReadBuffer
), null);
281 stream
.BeginRead(byteBuffer
, 0 , byteBuffer
.Length
, new AsyncCallback(ReadBuffer
), null);
287 // Read lines stored in StringBuilder and the buffer we just read into.
288 // A line is defined as a sequence of characters followed by
289 // a carriage return ('\r'), a line feed ('\n'), or a carriage return
290 // immediately followed by a line feed. The resulting string does not
291 // contain the terminating carriage return and/or line feed. The returned
292 // value is null if the end of the input stream has been reached.
295 private void GetLinesFromStringBuilder() {
296 int currentIndex
= currentLinePos
;
300 // skip a beginning '\n' character of new block if last block ended
302 if (bLastCarriageReturn
&& (len
> 0) && sb
[0] == '\n')
306 bLastCarriageReturn
= false;
309 while (currentIndex
< len
) {
310 char ch
= sb
[currentIndex
];
311 // Note the following common line feed chars:
312 // \n - UNIX \r\n - DOS \r - Mac
313 if (ch
== '\r' || ch
== '\n') {
314 string s
= sb
.ToString(lineStart
, currentIndex
- lineStart
);
315 lineStart
= currentIndex
+ 1;
316 // skip the "\n" character following "\r" character
317 if ((ch
== '\r') && (lineStart
< len
) && (sb
[lineStart
] == '\n'))
324 messageQueue
.Enqueue(s
);
329 if (sb
[len
- 1] == '\r') {
330 bLastCarriageReturn
= true;
332 // Keep the rest characaters which can't form a new line in string builder.
333 if (lineStart
< len
) {
334 if (lineStart
== 0) {
335 // we found no breaklines, in this case we cache the position
336 // so next time we don't have to restart from the beginning
337 currentLinePos
= currentIndex
;
340 sb
.Remove(0, lineStart
);
352 private void FlushMessageQueue() {
355 // When we call BeginReadLine, we also need to flush the queue
356 // So there could be a ---- between the ReadBuffer and BeginReadLine
357 // We need to take lock before DeQueue.
358 if( messageQueue
.Count
> 0) {
360 if( messageQueue
.Count
> 0) {
361 string s
= (string)messageQueue
.Dequeue();
362 // skip if the read is the read is cancelled
363 // this might happen inside UserCallBack
364 // However, continue to drain the queue
365 if (!cancelOperation
)
378 // Wait until we hit EOF. This is called from Process.WaitForExit
379 // We will lose some information if we don't do this.
380 internal void WaitUtilEOF() {
381 if( eofEvent
!= null) {