3 using System
.Collections
.Generic
;
4 using System
.Net
.Sockets
;
5 using System
.Runtime
.InteropServices
;
8 public class VxBufferStream
: Stream
10 static WvLog log
= new WvLog("VxBufferStream", WvLog
.L
.Debug4
);
11 private static int streamcount
= 0;
12 public readonly int streamno
= System
.Threading
.Interlocked
.Increment(ref streamcount
);
14 private bool closed
= false;
16 private bool eof
= false;
21 public override bool CanRead { get { return true; }
}
22 public override bool CanWrite { get { return true; }
}
23 public override bool CanSeek { get { return false; }
}
25 public override long Length
{
26 get { throw new NotSupportedException(); }
28 public override long Position
{
29 get { throw new NotSupportedException(); }
30 set { throw new NotSupportedException(); }
33 private object cookie
= null;
34 public object Cookie
{
35 get { return cookie; }
36 set { cookie = value; }
39 public delegate void DataReadyHandler(object sender
, object cookie
);
40 public event DataReadyHandler DataReady
;
41 public event DataReadyHandler NoMoreData
;
43 protected VxNotifySocket sock
;
45 // Maximum value for rbuf_used to take; run the DataReady event when this
47 private long rbuf_size
= 0;
49 protected Buffer rbuf
= new Buffer();
50 protected Buffer wbuf
= new Buffer();
52 public VxBufferStream(VxNotifySocket sock
)
54 sock
.Blocking
= false;
55 sock
.SetSocketOption(SocketOptionLevel
.Socket
,
56 SocketOptionName
.KeepAlive
, true);
57 sock
.ReadReady
+= OnReadable
;
58 sock
.WriteReady
+= OnWritable
;
63 public long BufferAmount
{
64 get { return rbuf_size; }
67 throw new ArgumentOutOfRangeException(
68 "BufferAmount must be nonnegative");
72 if (rbuf
.Size
>= rbuf_size
) {
76 VxEventLoop
.AddAction(new VxEvent(
78 DataReady(this, cookie
);
87 public int BufferPending
{
88 get { return rbuf.Size; }
91 public bool IsDataReady
{
92 get { return rbuf_size <= rbuf.Size; }
95 protected override void Dispose(bool disposing
)
104 WriteWaiting
= false;
116 base.Dispose(disposing
);
119 public override void Flush()
122 throw new ObjectDisposedException("Stream is closed");
128 sock
.Blocking
= true;
130 byte[] buf
= new byte[wbuf
.Size
];
131 wbuf
.Retrieve(buf
, 0, buf
.Length
);
136 int amt
= sock
.Send(buf
, sofar
, buf
.Length
-sofar
, SocketFlags
.None
);
138 // This shouldn't happen, but checking for it guarantees that
139 // this method will complete eventually
141 throw new Exception("Send returned " + amt
);
144 } while (sofar
< buf
.Length
);
146 sock
.Blocking
= false;
150 public override int Read(
151 [InAttribute
] [OutAttribute
] byte[] buffer
,
156 throw new ObjectDisposedException("Stream is closed");
158 return rbuf
.Retrieve(buffer
, offset
, count
);
161 public override int ReadByte()
164 throw new ObjectDisposedException("Stream is closed");
166 return rbuf
.RetrieveByte();
169 public override long Seek(long offset
, SeekOrigin origin
)
171 throw new NotSupportedException();
174 public override void SetLength(long len
)
176 throw new NotSupportedException();
179 public override void Write(byte[] buffer
, int offset
, int count
)
182 throw new ObjectDisposedException("Stream is closed");
184 log
.print("{0} Write\n", streamno
);
185 wbuf
.Append(buffer
, offset
, count
);
187 log
.print("{1} Written {0}\n", wbuf
.Size
, streamno
);
190 public override void WriteByte(byte value)
193 throw new ObjectDisposedException("Stream is closed");
195 wbuf
.AppendByte(value);
199 protected bool read_waiting
= false;
200 protected bool ReadWaiting
{
201 get { return read_waiting; }
203 if (!read_waiting
&& value) {
204 VxEventLoop
.RegisterRead(sock
);
205 } else if (read_waiting
&& !value) {
206 VxEventLoop
.UnregisterRead(sock
);
209 read_waiting
= value;
213 protected bool write_waiting
= false;
214 protected bool WriteWaiting
{
215 get { return write_waiting; }
217 if (!write_waiting
&& value) {
218 VxEventLoop
.RegisterWrite(sock
);
219 } else if (write_waiting
&& !value) {
220 VxEventLoop
.UnregisterWrite(sock
);
223 write_waiting
= value;
227 protected virtual bool OnReadable(object sender
)
230 throw new ObjectDisposedException("Stream is closed");
232 const int READSZ
= 16384;
235 byte[] data
= new byte[READSZ
];
237 while (rbuf
.Size
< rbuf_size
) {
238 log
.print("{1} Attempting receive (available = {0})\n", sock
.Available
, streamno
);
240 int amt
= sock
.Receive(data
);
247 rbuf
.Append(data
, 0, amt
);
249 } catch (SocketException e
) {
250 if (e
.ErrorCode
!= (int)SocketError
.WouldBlock
) {
255 if (rbuf
.Size
>= rbuf_size
) {
256 DataReady(this, cookie
);
260 NoMoreData(this, cookie
);
263 // Don't use ReadWaiting to change this since the return value will
264 // determine the fate in the event loop, far more efficiently than
266 read_waiting
= !eof
&& rbuf
.Size
< rbuf_size
;
270 protected virtual bool OnWritable(object sender
)
273 throw new ObjectDisposedException("Stream is closed");
276 log
.print("{1} Writable {0}\n", wbuf
.Size
, streamno
);
279 byte[] buf
= wbuf
.RetrieveBuf(out offset
, out length
);
282 throw new Exception("Writable handler called with nothing to write");
284 int amt
= sock
.Send(buf
, offset
, length
, SocketFlags
.None
);
287 } catch (SocketException e
) {
288 if (e
.ErrorCode
!= (int)SocketError
.WouldBlock
) {
293 // Don't use WriteWaiting to change this since the return value will
294 // determine the fate in the event loop, far more efficiently than
296 write_waiting
= wbuf
.Size
> 0;
297 return write_waiting
;
300 protected class Buffer
{
301 private const int BUFCHUNKSZ
= 16384;
303 private int buf_start
= 0;
304 private int buf_end
= BUFCHUNKSZ
;
306 private LinkedList
<byte[]> buf
= new LinkedList
<byte[]>();
308 // Number of bytes that can be retrieved
316 return buf
.Count
* BUFCHUNKSZ
+ buf_end
- buf_start
- BUFCHUNKSZ
;
321 // Number of bytes between buf_start and end of first buffer
322 private int FirstUsed
329 return buf_end
- buf_start
;
331 return BUFCHUNKSZ
- buf_start
;
336 // Number of bytes between buf_end and end of last buffer
344 return BUFCHUNKSZ
- buf_end
;
349 public void Append(byte[] buffer
, int offset
, int count
)
351 if (offset
+count
> buffer
.Length
)
352 throw new ArgumentException(
353 "offset+count is larger than buffer length");
355 throw new ArgumentNullException("buffer is null");
357 throw new ArgumentOutOfRangeException("offset is negative");
359 throw new ArgumentOutOfRangeException("count is negative");
364 while (sofar
< count
) {
365 while (LastLeft
== 0)
368 int amt
= Math
.Min(count
- sofar
, LastLeft
);
370 log
.print("Copy buf({3}), {0}, internal, {1}, {2}, count {4}\n", sofar
+offset
, buf_end
, amt
, buffer
.Length
, buf
.Count
);
371 Array
.Copy(buffer
, sofar
+offset
, buf
.Last
.Value
,
379 public void AppendByte(byte data
)
381 while (LastLeft
== 0)
384 buf
.Last
.Value
[buf_end
] = data
;
389 public int Retrieve(byte[] buffer
, int offset
, int count
)
391 if (offset
+count
> buffer
.Length
)
392 throw new ArgumentException(
393 "offset + count larger than buffer length");
395 throw new ArgumentNullException("buffer is null");
397 throw new ArgumentOutOfRangeException("offset is negative");
399 throw new ArgumentOutOfRangeException("count is negative");
407 while (sofar
< count
) {
408 while (FirstUsed
== 0) {
409 // buf.Count > 0 will be true because we know from above
410 // that buffer.Length <= original size. Thus if buf becomes
411 // completely emptied, then sofar >= buffer.Length and
412 // the loop would not have repeated.
416 int amt
= Math
.Min(count
- sofar
, FirstUsed
);
418 Array
.ConstrainedCopy(buf
.First
.Value
, buf_start
, buffer
,
430 public int RetrieveByte()
435 while (FirstUsed
== 0)
438 int result
= buf
.First
.Value
[buf_start
];
446 public byte[] RetrieveBuf(out int offset
, out int count
)
448 while (FirstUsed
== 0) {
449 if (buf
.Count
== 0) {
458 if (buf
.Count
== 0) {
467 return buf
.First
.Value
;
470 public void Discard(int amt
)
473 throw new ArgumentOutOfRangeException("amount is negative");
475 int cursize
= FirstUsed
;
477 while (amt
> cursize
) {
493 buf_end
= BUFCHUNKSZ
;
496 private void OptimizeBuf()
498 // If we finished reading the first buffer, contract it
499 while (buf
.Count
> 1 && FirstUsed
== 0) {
503 // Slight optimization for if the buffer is completely drained
504 // to possibly avoid extra expansions later on
505 if (buf_start
== buf_end
&& buf
.Count
== 1) {
511 private void Expand()
513 buf
.AddLast(new byte[BUFCHUNKSZ
]);
517 private void Contract()