Transport no longer has a Stream member.
[versaplex.git] / versaplexd / vxbufferstream.cs
blobd4c8f3bb2e661d6af329d1647fd8c1f2b860d5a3
1 using System;
2 using System.IO;
3 using System.Collections.Generic;
4 using System.Net.Sockets;
5 using System.Runtime.InteropServices;
6 using Wv;
8 public class VxBufferStream : Stream
10 static WvLog log = new WvLog("VxBufferStream", WvLog.L.Debug4);
11 private static int streamcount = 0;
12 public readonly int streamno = System.Threading.Interlocked.Increment(ref streamcount);
14 private bool closed = false;
16 private bool eof = false;
17 public bool Eof {
18 get { return eof; }
21 public override bool CanRead { get { return true; } }
22 public override bool CanWrite { get { return true; } }
23 public override bool CanSeek { get { return false; } }
25 public override long Length {
26 get { throw new NotSupportedException(); }
28 public override long Position {
29 get { throw new NotSupportedException(); }
30 set { throw new NotSupportedException(); }
33 private object cookie = null;
34 public object Cookie {
35 get { return cookie; }
36 set { cookie = value; }
39 public delegate void DataReadyHandler(object sender, object cookie);
40 public event DataReadyHandler DataReady;
41 public event DataReadyHandler NoMoreData;
43 protected VxNotifySocket sock;
45 // Maximum value for rbuf_used to take; run the DataReady event when this
46 // has filled
47 private long rbuf_size = 0;
49 protected Buffer rbuf = new Buffer();
50 protected Buffer wbuf = new Buffer();
52 public VxBufferStream(VxNotifySocket sock)
54 sock.Blocking = false;
55 sock.SetSocketOption(SocketOptionLevel.Socket,
56 SocketOptionName.KeepAlive, true);
57 sock.ReadReady += OnReadable;
58 sock.WriteReady += OnWritable;
60 this.sock = sock;
63 public long BufferAmount {
64 get { return rbuf_size; }
65 set {
66 if (value < 0)
67 throw new ArgumentOutOfRangeException(
68 "BufferAmount must be nonnegative");
70 rbuf_size = value;
72 if (rbuf.Size >= rbuf_size) {
73 ReadWaiting = false;
75 if (rbuf.Size > 0) {
76 VxEventLoop.AddAction(new VxEvent(
77 delegate() {
78 DataReady(this, cookie);
79 }));
81 } else {
82 ReadWaiting = true;
87 public int BufferPending {
88 get { return rbuf.Size; }
91 public bool IsDataReady {
92 get { return rbuf_size <= rbuf.Size; }
95 protected override void Dispose(bool disposing)
97 if (disposing) {
98 Flush();
100 rbuf.Clear();
101 ReadWaiting = false;
103 wbuf.Clear();
104 WriteWaiting = false;
106 if (sock != null) {
107 sock.Close();
108 sock = null;
111 cookie = null;
114 closed = true;
116 base.Dispose(disposing);
119 public override void Flush()
121 if (closed)
122 throw new ObjectDisposedException("Stream is closed");
124 if (wbuf.Size == 0)
125 return;
127 try {
128 sock.Blocking = true;
130 byte[] buf = new byte[wbuf.Size];
131 wbuf.Retrieve(buf, 0, buf.Length);
133 int sofar = 0;
135 do {
136 int amt = sock.Send(buf, sofar, buf.Length-sofar, SocketFlags.None);
138 // This shouldn't happen, but checking for it guarantees that
139 // this method will complete eventually
140 if (amt <= 0)
141 throw new Exception("Send returned " + amt);
143 sofar += amt;
144 } while (sofar < buf.Length);
145 } finally {
146 sock.Blocking = false;
150 public override int Read(
151 [InAttribute] [OutAttribute] byte[] buffer,
152 int offset,
153 int count)
155 if (closed)
156 throw new ObjectDisposedException("Stream is closed");
158 return rbuf.Retrieve(buffer, offset, count);
161 public override int ReadByte()
163 if (closed)
164 throw new ObjectDisposedException("Stream is closed");
166 return rbuf.RetrieveByte();
169 public override long Seek(long offset, SeekOrigin origin)
171 throw new NotSupportedException();
174 public override void SetLength(long len)
176 throw new NotSupportedException();
179 public override void Write(byte[] buffer, int offset, int count)
181 if (closed)
182 throw new ObjectDisposedException("Stream is closed");
184 log.print("{0} Write\n", streamno);
185 wbuf.Append(buffer, offset, count);
186 WriteWaiting = true;
187 log.print("{1} Written {0}\n", wbuf.Size, streamno);
190 public override void WriteByte(byte value)
192 if (closed)
193 throw new ObjectDisposedException("Stream is closed");
195 wbuf.AppendByte(value);
196 WriteWaiting = true;
199 protected bool read_waiting = false;
200 protected bool ReadWaiting {
201 get { return read_waiting; }
202 set {
203 if (!read_waiting && value) {
204 VxEventLoop.RegisterRead(sock);
205 } else if (read_waiting && !value) {
206 VxEventLoop.UnregisterRead(sock);
209 read_waiting = value;
213 protected bool write_waiting = false;
214 protected bool WriteWaiting {
215 get { return write_waiting; }
216 set {
217 if (!write_waiting && value) {
218 VxEventLoop.RegisterWrite(sock);
219 } else if (write_waiting && !value) {
220 VxEventLoop.UnregisterWrite(sock);
223 write_waiting = value;
227 protected virtual bool OnReadable(object sender)
229 if (closed)
230 throw new ObjectDisposedException("Stream is closed");
232 const int READSZ = 16384;
234 try {
235 byte[] data = new byte[READSZ];
237 while (rbuf.Size < rbuf_size) {
238 log.print("{1} Attempting receive (available = {0})\n", sock.Available, streamno);
240 int amt = sock.Receive(data);
242 if (amt == 0) {
243 eof = true;
244 break;
247 rbuf.Append(data, 0, amt);
249 } catch (SocketException e) {
250 if (e.ErrorCode != (int)SocketError.WouldBlock) {
251 throw e;
255 if (rbuf.Size >= rbuf_size) {
256 DataReady(this, cookie);
259 if (eof) {
260 NoMoreData(this, cookie);
263 // Don't use ReadWaiting to change this since the return value will
264 // determine the fate in the event loop, far more efficiently than
265 // generic removal
266 read_waiting = !eof && rbuf.Size < rbuf_size;
267 return read_waiting;
270 protected virtual bool OnWritable(object sender)
272 if (closed)
273 throw new ObjectDisposedException("Stream is closed");
275 try {
276 log.print("{1} Writable {0}\n", wbuf.Size, streamno);
278 int offset, length;
279 byte[] buf = wbuf.RetrieveBuf(out offset, out length);
281 if (buf == null)
282 throw new Exception("Writable handler called with nothing to write");
284 int amt = sock.Send(buf, offset, length, SocketFlags.None);
286 wbuf.Discard(amt);
287 } catch (SocketException e) {
288 if (e.ErrorCode != (int)SocketError.WouldBlock) {
289 throw e;
293 // Don't use WriteWaiting to change this since the return value will
294 // determine the fate in the event loop, far more efficiently than
295 // generic removal
296 write_waiting = wbuf.Size > 0;
297 return write_waiting;
300 protected class Buffer {
301 private const int BUFCHUNKSZ = 16384;
303 private int buf_start = 0;
304 private int buf_end = BUFCHUNKSZ;
306 private LinkedList<byte[]> buf = new LinkedList<byte[]>();
308 // Number of bytes that can be retrieved
309 public int Size
311 get {
312 switch (buf.Count) {
313 case 0:
314 return 0;
315 default:
316 return buf.Count * BUFCHUNKSZ + buf_end - buf_start - BUFCHUNKSZ;
321 // Number of bytes between buf_start and end of first buffer
322 private int FirstUsed
324 get {
325 switch (buf.Count) {
326 case 0:
327 return 0;
328 case 1:
329 return buf_end - buf_start;
330 default:
331 return BUFCHUNKSZ - buf_start;
336 // Number of bytes between buf_end and end of last buffer
337 private int LastLeft
339 get {
340 switch (buf.Count) {
341 case 0:
342 return 0;
343 default:
344 return BUFCHUNKSZ - buf_end;
349 public void Append(byte[] buffer, int offset, int count)
351 if (offset+count > buffer.Length)
352 throw new ArgumentException(
353 "offset+count is larger than buffer length");
354 if (buffer == null)
355 throw new ArgumentNullException("buffer is null");
356 if (offset < 0)
357 throw new ArgumentOutOfRangeException("offset is negative");
358 if (count < 0)
359 throw new ArgumentOutOfRangeException("count is negative");
362 int sofar = 0;
364 while (sofar < count) {
365 while (LastLeft == 0)
366 Expand();
368 int amt = Math.Min(count - sofar, LastLeft);
370 log.print("Copy buf({3}), {0}, internal, {1}, {2}, count {4}\n", sofar+offset, buf_end, amt, buffer.Length, buf.Count);
371 Array.Copy(buffer, sofar+offset, buf.Last.Value,
372 buf_end, amt);
374 buf_end += amt;
375 sofar += amt;
379 public void AppendByte(byte data)
381 while (LastLeft == 0)
382 Expand();
384 buf.Last.Value[buf_end] = data;
386 buf_end++;
389 public int Retrieve(byte[] buffer, int offset, int count)
391 if (offset+count > buffer.Length)
392 throw new ArgumentException(
393 "offset + count larger than buffer length");
394 if (buffer == null)
395 throw new ArgumentNullException("buffer is null");
396 if (offset < 0)
397 throw new ArgumentOutOfRangeException("offset is negative");
398 if (count < 0)
399 throw new ArgumentOutOfRangeException("count is negative");
401 if (count > Size) {
402 return -1;
405 int sofar = 0;
407 while (sofar < count) {
408 while (FirstUsed == 0) {
409 // buf.Count > 0 will be true because we know from above
410 // that buffer.Length <= original size. Thus if buf becomes
411 // completely emptied, then sofar >= buffer.Length and
412 // the loop would not have repeated.
413 Contract();
416 int amt = Math.Min(count - sofar, FirstUsed);
418 Array.ConstrainedCopy(buf.First.Value, buf_start, buffer,
419 sofar+offset, amt);
421 buf_start += amt;
422 sofar += amt;
425 OptimizeBuf();
427 return sofar;
430 public int RetrieveByte()
432 if (Size == 0)
433 return -1;
435 while (FirstUsed == 0)
436 Contract();
438 int result = buf.First.Value[buf_start];
440 buf_start++;
441 OptimizeBuf();
443 return result;
446 public byte[] RetrieveBuf(out int offset, out int count)
448 while (FirstUsed == 0) {
449 if (buf.Count == 0) {
450 offset = 0;
451 count = 0;
452 return null;
455 Contract();
458 if (buf.Count == 0) {
459 offset = 0;
460 count = 0;
461 return null;
464 offset = buf_start;
465 count = FirstUsed;
467 return buf.First.Value;
470 public void Discard(int amt)
472 if (amt < 0)
473 throw new ArgumentOutOfRangeException("amount is negative");
475 int cursize = FirstUsed;
477 while (amt > cursize) {
478 Contract();
479 amt -= cursize;
481 cursize = FirstUsed;
484 buf_start += amt;
486 OptimizeBuf();
489 public void Clear()
491 buf.Clear();
492 buf_start = 0;
493 buf_end = BUFCHUNKSZ;
496 private void OptimizeBuf()
498 // If we finished reading the first buffer, contract it
499 while (buf.Count > 1 && FirstUsed == 0) {
500 Contract();
503 // Slight optimization for if the buffer is completely drained
504 // to possibly avoid extra expansions later on
505 if (buf_start == buf_end && buf.Count == 1) {
506 buf_start = 0;
507 buf_end = 0;
511 private void Expand()
513 buf.AddLast(new byte[BUFCHUNKSZ]);
514 buf_end = 0;
517 private void Contract()
519 buf.RemoveFirst();
520 buf_start = 0;