vxodbc: render timeless datetimes correctly into strings.
[versaplex.git] / wvdotnet / wvstream.cs
blob1bdee4a879c6dff6c79cf22d146412646dec2fec
1 using System;
2 using System.Net;
3 using Wv.Extensions;
5 namespace Wv
7 public interface IWvStream: IDisposable
9 bool ok { get; }
10 Exception err { get; }
11 EndPoint localaddr { get; }
12 EndPoint remoteaddr { get; }
14 int read(WvBytes b);
15 int write(WvBytes b);
16 bool flush(int msec_timeout);
18 event Action onreadable;
19 event Action onwritable;
20 event Action onclose;
22 void close();
23 void noread();
24 void nowrite();
27 public class WvStream : IWvStream
29 public static IWvEventer ev = new WvEventer();
31 public static void runonce()
33 ev.runonce();
36 public static void runonce(int msec_timeout)
38 ev.runonce(msec_timeout);
41 public WvStream()
45 public void Dispose()
47 close();
50 bool is_readable = false, is_writable = false;
51 event Action _onreadable, _onwritable, _onclose;
53 protected bool can_onreadable { get { return _onreadable != null; } }
54 protected bool can_onwritable { get { return _onwritable != null; } }
56 protected void do_readable()
58 if (can_onreadable)
60 is_readable = false;
61 _onreadable();
65 protected void do_writable()
67 if (can_onwritable)
69 is_writable = false;
70 _onwritable();
74 protected void do_close()
76 if (_onclose != null)
77 _onclose();
80 object pr_obj = new object();
81 protected void post_readable()
83 is_readable = true;
84 ev.addpending(pr_obj, do_readable);
87 object pw_obj = new object();
88 protected void post_writable()
90 is_writable = true;
91 ev.addpending(pw_obj, do_writable);
94 public virtual event Action onreadable {
95 add { _onreadable += value; if (is_readable) post_readable(); }
96 remove { _onreadable -= value; }
98 public virtual event Action onwritable {
99 add { _onwritable += value; if (is_writable) post_writable(); }
100 remove { _onwritable -= value; }
102 public virtual event Action onclose {
103 add { _onclose += value; }
104 remove { _onclose -= value; }
107 bool isopen = true;
108 public virtual bool ok { get { return isopen && err == null; } }
110 Exception _err;
111 public virtual Exception err {
112 get {
113 return _err;
115 set {
116 if (_err == null) // remember the *first* error
118 _err = value;
119 close();
124 public virtual void close()
126 ev.delpending(pr_obj);
127 ev.delpending(pw_obj);
128 canread = false;
129 flush(-1);
130 canwrite = false;
131 if (isopen)
133 isopen = false;
134 if (_onclose != null) _onclose();
136 GC.SuppressFinalize(this);
139 public virtual EndPoint localaddr {
140 get { return null; }
143 public virtual EndPoint remoteaddr {
144 get { return null; }
147 public virtual int read(WvBytes b)
149 return 0;
152 // for convenience. Note: always returns non-null, but the returned
153 // array size might be zero.
154 public WvBytes read(int len)
156 WvBytes bytes = new byte[len];
157 int got = read(bytes);
158 return bytes.sub(0, got);
161 public void read(WvBuf b, int max)
163 int got = read(b.alloc(max));
164 b.unalloc(max-got);
167 public virtual int write(WvBytes b)
169 return b.len; // lie: we "wrote" all the bytes to nowhere
173 * Wait up to msec_timeout milliseconds for the stream to become
174 * readable or writable, respectively.
176 * Our default implementation always returns immediately, consistent
177 * with the Unix select() behaviour of returning immediately on
178 * non-select()able file handles.
180 * Returns true if the stream is readable or writable before the
181 * timeout, false otherwise.
183 * Waiting synchronously is usually a bad idea in WvStreams
184 * programs. Use onreadable/onwritable/onclose instead whenever
185 * you can.
187 public virtual bool wait(int msec_timeout,
188 bool readable, bool writable)
190 if (readable && is_readable)
191 return true;
192 if (writable && is_writable)
193 return true;
194 if (!ok || (readable && !canread) || (writable && !canwrite))
195 return false;
196 return true;
199 // Don't make these anything but private! They're tempting, but
200 // they're not *really* what you want to know. Use these instead:
201 // ok -> whether or not your stream is useful at all
202 // wait(0, true, false) -> whether your stream has bytes *right now*.
203 // If canread goes false, wait(-1, true, false) will return false.
204 bool canread = true, canwrite = true;
206 public virtual void noread()
208 canread = false;
209 maybe_autoclose();
212 public virtual void nowrite()
214 canwrite = false;
215 maybe_autoclose();
218 public void maybe_autoclose()
220 if (!canread && !canwrite)
221 close();
224 public virtual bool flush(int msec_timeout)
226 return true; // no buffer
229 // WARNING: assumes the write() will succeed! Use only on WvStreams
230 // with a write buffer.
231 public void print(string fmt, params object[] args)
233 print((object)wv.fmt(fmt, args));
236 // WARNING: assumes the write() will succeed! Use only on WvStreams
237 // with a write buffer.
238 public virtual void print(object o)
240 byte[] b = o.ToUTF8();
241 int n = write(b);
242 wv.assert(n == b.Length,
243 "Don't use print() on an unbuffered WvStream!");
247 // Wraps a WvStream in another WvStream, allowing us to override some
248 // behaviour. By default, a WvStreamClone just passes everything through
249 // to the inner stream.
250 public class WvStreamClone : WvStream
252 protected WvStream inner = null;
253 bool hasinner { get { return inner != null; } }
255 public WvStreamClone(WvStream inner)
257 setinner(inner);
260 public void setinner(WvStream inner)
262 if (inner != this.inner)
264 if (hasinner)
266 this.inner.onreadable -= do_readable;
267 this.inner.onwritable -= do_writable;
268 this.inner.onclose -= do_close;
270 this.inner = inner;
271 if (hasinner)
273 if (can_onreadable) this.inner.onreadable += do_readable;
274 if (can_onwritable) this.inner.onwritable += do_writable;
275 this.inner.onclose += do_close;
280 public override bool ok {
281 get { return base.ok && hasinner && inner.ok; }
284 public override Exception err {
285 get {
286 return hasinner ? inner.err : base.err;
288 set {
289 if (hasinner)
290 inner.err = value;
291 else
292 base.err = value;
296 public override EndPoint localaddr {
297 get { return hasinner ? inner.localaddr : base.localaddr; }
299 public override EndPoint remoteaddr {
300 get { return hasinner ? inner.localaddr : base.localaddr; }
303 public override int read(WvBytes b)
305 if (hasinner)
306 return inner.read(b);
307 else
308 return 0; // 0 bytes read
311 public override int write(WvBytes b)
313 if (hasinner)
314 return inner.write(b);
315 else
316 return 0; // 0 bytes written
319 public override bool wait(int msec_timeout,
320 bool readable, bool writable)
322 if (hasinner)
323 return inner.wait(msec_timeout, readable, writable);
324 else
325 return base.wait(msec_timeout, readable, writable);
328 public override void noread()
330 base.noread();
331 if (hasinner)
332 inner.noread();
335 public override void nowrite()
337 base.nowrite();
338 if (hasinner)
339 inner.nowrite();
342 public override bool flush(int msec_timeout)
344 if (hasinner)
345 return inner.flush(msec_timeout);
346 else
347 return true;
350 // We only want to register our callback with the inner stream if
351 // we *have* a callback, and then only once. Otherwise the stream
352 // might start listening for read when we don't have any readable
353 // handlers, resulting in it spinning forever.
354 public override event Action onreadable {
355 add { if (!can_onreadable) inner.onreadable += do_readable;
356 base.onreadable += value; }
357 remove { base.onreadable -= value;
358 if (!can_onreadable) inner.onreadable -= do_readable; }
360 public override event Action onwritable {
361 add { if (!can_onwritable) inner.onwritable += do_writable;
362 base.onwritable += value; }
363 remove { base.onwritable -= value;
364 if (!can_onwritable) inner.onwritable -= do_writable; }
369 /// Adds an input buffer to a WvStream.
370 public class WvInBufStream : WvStreamClone
372 WvBuf inbuf = new WvBuf();
374 public WvInBufStream(WvStream inner) : base(inner)
378 public override int read(WvBytes b)
380 if (inbuf.used > 0)
382 int max = inbuf.used > b.len ? b.len : inbuf.used;
383 b.put(0, inbuf.get(max));
384 post_readable();
385 return max;
387 else
388 return base.read(b);
391 public string _getline(char splitchar)
393 if (inbuf.strchr(splitchar) > 0)
395 post_readable(); // not stalled yet!
396 return inbuf.get(inbuf.strchr(splitchar)).FromUTF8();
398 return null;
401 public string getline(int msec_timeout, char splitchar)
403 string l = _getline(splitchar);
404 if (l != null) return l;
406 foreach (var remain in wv.until(msec_timeout))
408 if (inner.wait(remain, true, false))
409 inner.read(inbuf, 4096);
411 l = _getline(splitchar);
412 if (l != null) return l;
414 if (inbuf.used == 0 && !ok)
415 break;
418 if (!ok && inbuf.used > 0)
419 return inbuf.getall().FromUTF8();
421 return null;
425 /// Adds an output buffer to a WvStream
426 public class WvOutBufStream : WvStreamClone
428 WvBuf outbuf = new WvBuf();
429 bool writereg = true;
431 public WvOutBufStream(WvStream inner) : base(inner)
435 public override int write(WvBytes b)
437 outbuf.put(b);
438 flush(0);
440 // always succeed
441 return b.len;
444 void _flush()
446 int wrote = base.write(outbuf.peekall());
447 outbuf.get(wrote);
449 if (outbuf.used > 0 && !writereg)
451 inner.onwritable += _flush;
452 writereg = true;
454 else if (outbuf.used == 0 && writereg)
456 inner.onwritable -= _flush;
457 writereg = false;
460 if (outbuf.used == 0)
461 post_writable();
464 public override bool flush(int msec_timeout)
466 foreach (int remain in wv.until(msec_timeout))
468 _flush();
469 if (remain != 0 && outbuf.used > 0)
470 wait(remain, false, true);
471 if (outbuf.used == 0)
472 return true;
474 return false;
479 * A combination of WvInBufStream and WvOutBufStream.
481 * We put the OutBufStream on the inside and the InBufStream on the
482 * outside, because InBufStream actually adds API functions while
483 * OutBufStream doesn't. So getline() will work as expected on this
484 * kind of stream.
486 public class WvBufStream : WvInBufStream
488 public WvBufStream(WvStream inner)
489 : base(new WvOutBufStream(inner))