configure.ac: if neither boost nor tr1 is available, die right away.
[wvstreams.git] / streams / wvstream.cc
blob0e57c13dd323060a10e722920a640d8facfe2492
1 /*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * Unified support for streams, that is, sequences of bytes that may or
6 * may not be ready for read/write at any given time.
7 *
8 * We provide typical read and write routines, as well as a select() function
9 * for each stream.
11 #include <time.h>
12 #include <sys/types.h>
13 #include <assert.h>
14 #define __WVSTREAM_UNIT_TEST 1
15 #include "wvstream.h"
16 #include "wvtimeutils.h"
17 #include "wvcont.h"
18 #include "wvstreamsdebugger.h"
19 #include "wvstrutils.h"
20 #include "wvistreamlist.h"
21 #include "wvlinkerhack.h"
22 #include "wvmoniker.h"
24 #ifdef _WIN32
25 #define ENOBUFS WSAENOBUFS
26 #undef errno
27 #define errno GetLastError()
28 #ifdef __GNUC__
29 #include <sys/socket.h>
30 #endif
31 #include "streams.h"
32 #else
33 #include <errno.h>
34 #endif
36 #include <map>
38 using std::make_pair;
39 using std::map;
42 // enable this to add some read/write trace messages (this can be VERY
43 // verbose)
44 #if 0
45 # ifndef _MSC_VER
46 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
47 # else
48 # define TRACE printf
49 # endif
50 #else
51 # ifndef _MSC_VER
52 # define TRACE(x, y...)
53 # else
54 # define TRACE
55 # endif
56 #endif
58 WvStream *WvStream::globalstream = NULL;
60 UUID_MAP_BEGIN(WvStream)
61 UUID_MAP_ENTRY(IObject)
62 UUID_MAP_ENTRY(IWvStream)
63 UUID_MAP_END
66 static map<WSID, WvStream*> *wsid_map;
67 static WSID next_wsid_to_try;
70 WV_LINK(WvStream);
72 static IWvStream *create_null(WvStringParm, IObject *)
74 return new WvStream();
77 static WvMoniker<IWvStream> reg("null", create_null);
80 IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
83 if (!s)
85 s = new WvStream();
86 s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
87 WVRELEASE(obj); // we're not going to use it after all
89 return s;
93 static bool is_prefix_insensitive(const char *str, const char *prefix)
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
100 static const char *strstr_insensitive(const char *haystack, const char *needle)
102 while (*haystack != '\0')
104 if (is_prefix_insensitive(haystack, needle))
105 return haystack;
106 ++haystack;
108 return NULL;
112 static bool contains_insensitive(const char *haystack, const char *needle)
114 return strstr_insensitive(haystack, needle) != NULL;
118 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119 static inline const char *Yes_No(bool val)
121 return val? "Yes": "No";
125 void WvStream::debugger_streams_display_header(WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
128 WvStringList result;
129 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
130 "----------------Type", "-", "Name--------------------");
131 result_cb(cmd, result);
135 // Set to fit in 6 chars
136 static WvString friendly_ms(time_t ms)
138 if (ms <= 0)
139 return WvString("(%s)", ms);
140 else if (ms < 1000)
141 return WvString("%sms", ms);
142 else if (ms < 60*1000)
143 return WvString("%ss", ms/1000);
144 else if (ms < 60*60*1000)
145 return WvString("%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString("%sh", ms/(60*60*1000));
148 else
149 return WvString("%sd", ms/(24*60*60*1000));
152 void WvStream::debugger_streams_display_one_stream(WvStream *s,
153 WvStringParm cmd,
154 WvStreamsDebugger::ResultCallback result_cb)
156 WvStringList result;
157 s->addRef();
158 unsigned refcount = s->release();
159 result.append(list_format,
160 s->wsid(), " ",
161 refcount, " ",
162 Yes_No(s->isok()), " ",
163 Yes_No(s->uses_continue_select), " ",
164 friendly_ms(s->alarm_remaining()), " ",
165 s->wstype(), " ",
166 s->wsname());
167 result_cb(cmd, result);
171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
172 WvStringParm cmd,
173 const WvStringList &args,
174 WvStreamsDebugger::ResultCallback result_cb)
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
180 WSID wsid;
181 bool is_num = wvstring_to_num(*arg, wsid);
183 if (is_num)
185 if (s->wsid() == wsid)
187 show = true;
188 break;
191 else
193 if (s->wsname() && contains_insensitive(s->wsname(), *arg)
194 || s->wstype() && contains_insensitive(s->wstype(), *arg))
196 show = true;
197 break;
201 if (show)
202 debugger_streams_display_one_stream(s, cmd, result_cb);
206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
207 WvStringList &args,
208 WvStreamsDebugger::ResultCallback result_cb, void *)
210 debugger_streams_display_header(cmd, result_cb);
211 if (wsid_map)
213 map<WSID, WvStream*>::iterator it;
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
217 result_cb);
220 return WvString::null;
224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
225 WvStringList &args,
226 WvStreamsDebugger::ResultCallback result_cb, void *)
228 if (args.isempty())
229 return WvString("Usage: %s <WSID>", cmd);
230 WSID wsid;
231 WvString wsid_str = args.popstr();
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString("Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
235 if (!s)
236 return WvString("No such stream");
237 s->close();
238 return WvString::null;
242 void WvStream::add_debugger_commands()
244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
249 WvStream::WvStream():
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
255 stop_read(false),
256 stop_write(false),
257 closed(false),
258 readcb(wv::bind(&WvStream::legacy_callback, this)),
259 max_outbuf_size(0),
260 outbuf_delayed_flush(false),
261 is_auto_flush(true),
262 want_to_flush(true),
263 is_flushing(false),
264 queue_min(0),
265 autoclose_time(0),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
269 TRACE("Creating wvstream %p\n", this);
271 static bool first = true;
272 if (first)
274 first = false;
275 WvStream::add_debugger_commands();
278 // Choose a wsid;
279 if (!wsid_map)
280 wsid_map = new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
285 break;
286 ++next_wsid_to_try;
287 } while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
290 assert(inserted);
292 #ifdef _WIN32
293 WSAData wsaData;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
295 assert(result == 0);
296 #endif
300 // FIXME: interfaces (IWvStream) shouldn't have implementations!
301 IWvStream::IWvStream()
306 IWvStream::~IWvStream()
311 WvStream::~WvStream()
313 TRACE("destroying %p\n", this);
314 close();
316 // if this assertion fails, then uses_continue_select is true, but you
317 // didn't call terminate_continue_select() or close() before destroying
318 // your object. Shame on you!
319 assert(!uses_continue_select || !call_ctx);
321 call_ctx = 0; // finish running the suspended callback, if any
323 assert(wsid_map);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
327 delete wsid_map;
328 wsid_map = NULL;
331 // eventually, streams will auto-add themselves to the globallist. But
332 // even before then, it'll never be useful for them to be on the
333 // globallist *after* they get destroyed, so we might as well auto-remove
334 // them already. It's harmless for people to try to remove them twice.
335 WvIStreamList::globallist.unlink(this);
337 TRACE("done destroying %p\n", this);
341 void WvStream::close()
343 TRACE("flushing in wvstream...\n");
344 flush(2000); // fixme: should not hardcode this stuff
345 TRACE("(flushed)\n");
347 closed = true;
349 if (!!closecb)
351 IWvStreamCallback cb = closecb;
352 closecb = 0; // ensure callback is only called once
353 cb();
356 // I would like to delete call_ctx here, but then if someone calls
357 // close() from *inside* a continuable callback, we explode. Oops!
358 //call_ctx = 0; // destroy the context, if necessary
362 void WvStream::autoforward(WvStream &s)
364 setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
365 read_requires_writable = &s;
369 void WvStream::noautoforward()
371 setcallback(0);
372 read_requires_writable = NULL;
376 void WvStream::autoforward_callback(WvStream &input, WvStream &output)
378 char buf[1024];
379 size_t len;
381 len = input.read(buf, sizeof(buf));
382 output.write(buf, len);
386 void WvStream::_callback()
388 execute();
389 if (!!callfunc)
390 callfunc();
394 void *WvStream::_callwrap(void *)
396 _callback();
397 return NULL;
401 void WvStream::callback()
403 TRACE("(?)");
405 // if the alarm has gone off and we're calling callback... good!
406 if (alarm_remaining() == 0)
408 alarm_time = wvtime_zero;
409 alarm_was_ticking = true;
411 else
412 alarm_was_ticking = false;
414 assert(!uses_continue_select || personal_stack_size >= 1024);
416 #define TEST_CONTINUES_HARSHLY 0
417 #if TEST_CONTINUES_HARSHLY
418 #ifndef _WIN32
419 # warning "Using WvCont for *all* streams for testing!"
420 #endif
421 if (1)
422 #else
423 if (uses_continue_select && personal_stack_size >= 1024)
424 #endif
426 if (!call_ctx) // no context exists yet!
428 call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
429 personal_stack_size);
432 call_ctx(NULL);
434 else
435 _callback();
437 // if this assertion fails, a derived class's virtual execute() function
438 // didn't call its parent's execute() function, and we didn't make it
439 // all the way back up to WvStream::execute(). This doesn't always
440 // matter right now, but it could lead to obscure bugs later, so we'll
441 // enforce it.
445 bool WvStream::isok() const
447 return !closed && WvErrorBase::isok();
451 void WvStream::seterr(int _errnum)
453 if (!geterr()) // no pre-existing error
455 WvErrorBase::seterr(_errnum);
456 close();
461 size_t WvStream::read(WvBuf &outbuf, size_t count)
463 // for now, just wrap the older read function
464 size_t free = outbuf.free();
465 if (count > free)
466 count = free;
468 WvDynBuf tmp;
469 unsigned char *buf = tmp.alloc(count);
470 size_t len = read(buf, count);
471 tmp.unalloc(count - len);
472 outbuf.merge(tmp);
473 return len;
477 size_t WvStream::write(WvBuf &inbuf, size_t count)
479 // for now, just wrap the older write function
480 size_t avail = inbuf.used();
481 if (count > avail)
482 count = avail;
483 const unsigned char *buf = inbuf.get(count);
484 size_t len = write(buf, count);
485 inbuf.unget(count - len);
486 return len;
490 size_t WvStream::read(void *buf, size_t count)
492 assert(!count || buf);
494 size_t bufu, i;
495 unsigned char *newbuf;
497 bufu = inbuf.used();
498 if (bufu < queue_min)
500 newbuf = inbuf.alloc(queue_min - bufu);
501 assert(newbuf);
502 i = uread(newbuf, queue_min - bufu);
503 inbuf.unalloc(queue_min - bufu - i);
505 bufu = inbuf.used();
508 if (bufu < queue_min)
510 maybe_autoclose();
511 return 0;
514 // if buffer is empty, do a hard read
515 if (!bufu)
516 bufu = uread(buf, count);
517 else
519 // otherwise just read from the buffer
520 if (bufu > count)
521 bufu = count;
523 memcpy(buf, inbuf.get(bufu), bufu);
526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
527 maybe_autoclose();
528 return bufu;
532 size_t WvStream::write(const void *buf, size_t count)
534 assert(!count || buf);
535 if (!isok() || !buf || !count || stop_write) return 0;
537 size_t wrote = 0;
538 if (!outbuf_delayed_flush && !outbuf.used())
540 wrote = uwrite(buf, count);
541 count -= wrote;
542 buf = (const unsigned char *)buf + wrote;
543 // if (!count) return wrote; // short circuit if no buffering needed
545 if (max_outbuf_size != 0)
547 size_t canbuffer = max_outbuf_size - outbuf.used();
548 if (count > canbuffer)
549 count = canbuffer; // can't write the whole amount
551 if (count != 0)
553 outbuf.put(buf, count);
554 wrote += count;
557 if (should_flush())
559 if (is_auto_flush)
560 flush(0);
561 else
562 flush_outbuf(0);
565 return wrote;
569 void WvStream::noread()
571 stop_read = true;
572 maybe_autoclose();
576 void WvStream::nowrite()
578 stop_write = true;
579 maybe_autoclose();
583 void WvStream::maybe_autoclose()
585 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
586 close();
590 bool WvStream::isreadable()
592 return isok() && select(0, true, false, false);
596 bool WvStream::iswritable()
598 return !stop_write && isok() && select(0, false, true, false);
602 char *WvStream::blocking_getline(time_t wait_msec, int separator,
603 int readahead)
605 assert(separator >= 0);
606 assert(separator <= 255);
608 //assert(uses_continue_select || wait_msec == 0);
610 WvTime timeout_time(0);
611 if (wait_msec > 0)
612 timeout_time = msecadd(wvtime(), wait_msec);
614 maybe_autoclose();
616 // if we get here, we either want to wait a bit or there is data
617 // available.
618 while (isok())
620 // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
621 queuemin(0);
623 // if there is a newline already, we have enough data.
624 if (inbuf.strchr(separator) > 0)
625 break;
626 else if (!isok() || stop_read) // uh oh, stream is in trouble.
627 break;
629 // make select not return true until more data is available
630 queuemin(inbuf.used() + 1);
632 // compute remaining timeout
633 if (wait_msec > 0)
635 wait_msec = msecdiff(timeout_time, wvtime());
636 if (wait_msec < 0)
637 wait_msec = 0;
640 // FIXME: this is blocking_getline. It shouldn't
641 // call continue_select()!
642 bool hasdata;
643 if (wait_msec != 0 && uses_continue_select)
644 hasdata = continue_select(wait_msec);
645 else
646 hasdata = select(wait_msec, true, false);
648 if (!isok())
649 break;
651 if (hasdata)
653 // read a few bytes
654 WvDynBuf tmp;
655 unsigned char *buf = tmp.alloc(readahead);
656 assert(buf);
657 size_t len = uread(buf, readahead);
658 tmp.unalloc(readahead - len);
659 inbuf.put(tmp.get(len), len);
660 hasdata = len > 0; // enough?
663 if (!isok())
664 break;
666 if (!hasdata && wait_msec == 0)
667 return NULL; // handle timeout
669 if (!inbuf.used())
670 return NULL;
672 // return the appropriate data
673 size_t i = 0;
674 i = inbuf.strchr(separator);
675 if (i > 0) {
676 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
677 assert(eol && *eol == separator);
678 *eol = 0;
679 return const_cast<char*>((const char *)inbuf.get(i));
680 } else {
681 // handle "EOF without newline" condition
682 // FIXME: it's very silly that buffers can't return editable
683 // char* arrays.
684 inbuf.alloc(1)[0] = 0; // null-terminate it
685 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
690 char *WvStream::continue_getline(time_t wait_msec, int separator,
691 int readahead)
693 assert(false && "not implemented, come back later!");
694 assert(uses_continue_select);
695 return NULL;
699 void WvStream::drain()
701 char buf[1024];
702 while (isreadable())
703 read(buf, sizeof(buf));
707 bool WvStream::flush(time_t msec_timeout)
709 if (is_flushing) return false;
711 TRACE("%p flush starts\n", this);
713 is_flushing = true;
714 want_to_flush = true;
715 bool done = flush_internal(msec_timeout) // any other internal buffers
716 && flush_outbuf(msec_timeout); // our own outbuf
717 is_flushing = false;
719 TRACE("flush stops (%d)\n", done);
720 return done;
724 bool WvStream::should_flush()
726 return want_to_flush;
730 bool WvStream::flush_outbuf(time_t msec_timeout)
732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
733 bool outbuf_was_used = outbuf.used();
735 // do-nothing shortcut for speed
736 // FIXME: definitely makes a "measurable" difference...
737 // but is it worth the risk?
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
740 maybe_autoclose();
741 return true;
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
746 // flush outbuf
747 while (outbuf_was_used && isok())
749 // fprintf(stderr, "%p: fd:%d/%d, used:%d\n",
750 // this, getrfd(), getwfd(), outbuf.used());
752 size_t attempt = outbuf.optgettable();
753 size_t real = uwrite(outbuf.get(attempt), attempt);
755 // WARNING: uwrite() may have messed up our outbuf!
756 // This probably only happens if uwrite() closed the stream because
757 // of an error, so we'll check isok().
758 if (isok() && real < attempt)
760 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.ungettable() >= attempt - real);
762 outbuf.unget(attempt - real);
765 // since post_select() can call us, and select() calls post_select(),
766 // we need to be careful not to call select() if we don't need to!
767 // post_select() will only call us with msec_timeout==0, and we don't
768 // need to do select() in that case anyway.
769 if (!msec_timeout)
770 break;
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
773 break;
775 outbuf_was_used = outbuf.used();
778 // handle autoclose
779 if (autoclose_time && isok())
781 time_t now = time(NULL);
782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.used());
784 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
786 autoclose_time = 0; // avoid infinite recursion!
787 close();
791 TRACE("flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush = false;
795 TRACE("flush_outbuf: now isok=%d\n", isok());
797 // if we can't flush the outbuf, at least empty it!
798 if (outbuf_was_used && !isok())
799 outbuf.zap();
801 maybe_autoclose();
802 TRACE("flush_outbuf stops\n");
804 return !outbuf_was_used;
808 bool WvStream::flush_internal(time_t msec_timeout)
810 // once outbuf emptied, that's it for most streams
811 return true;
815 int WvStream::getrfd() const
817 return -1;
821 int WvStream::getwfd() const
823 return -1;
827 void WvStream::flush_then_close(int msec_timeout)
829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.used(), autoclose_time - now);
835 // as a fast track, we _could_ close here: but that's not a good idea,
836 // since flush_then_close() deals with obscure situations, and we don't
837 // want the caller to use it incorrectly. So we make things _always_
838 // break when the caller forgets to call select() later.
840 flush(0);
844 void WvStream::pre_select(SelectInfo &si)
846 maybe_autoclose();
848 time_t alarmleft = alarm_remaining();
850 if (!isok() || (!si.inherit_request && alarmleft == 0))
852 si.msec_timeout = 0;
853 return; // alarm has rung
856 if (!si.inherit_request)
858 si.wants.readable |= static_cast<bool>(readcb);
859 si.wants.writable |= static_cast<bool>(writecb);
860 si.wants.isexception |= static_cast<bool>(exceptcb);
863 // handle read-ahead buffering
864 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
866 si.msec_timeout = 0; // already ready
867 return;
869 if (alarmleft >= 0
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
875 bool WvStream::post_select(SelectInfo &si)
877 if (!si.inherit_request)
879 si.wants.readable |= static_cast<bool>(readcb);
880 si.wants.writable |= static_cast<bool>(writecb);
881 si.wants.isexception |= static_cast<bool>(exceptcb);
884 // FIXME: need sane buffer flush support for non FD-based streams
885 // FIXME: need read_requires_writable and write_requires_readable
886 // support for non FD-based streams
888 // note: flush(nonzero) might call select(), but flush(0) never does,
889 // so this is safe.
890 if (should_flush())
891 flush(0);
892 if (!si.inherit_request && alarm_remaining() == 0)
893 return true; // alarm ticked
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.used() && inbuf.used() >= queue_min)
896 return true; // already ready
897 return false;
901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902 bool readable, bool writable, bool isexcept, bool forceable)
904 FD_ZERO(&si.read);
905 FD_ZERO(&si.write);
906 FD_ZERO(&si.except);
908 if (forceable)
910 si.wants.readable = readcb;
911 si.wants.writable = writecb;
912 si.wants.isexception = exceptcb;
914 else
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
921 si.max_fd = -1;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure = false;
926 wvstime_sync();
928 pre_select(si);
929 if (globalstream && forceable && (globalstream != this))
931 WvStream *s = globalstream;
932 globalstream = NULL; // prevent recursion
933 s->xpre_select(si, SelectRequest(false, false, false));
934 globalstream = s;
939 int WvStream::_do_select(SelectInfo &si)
941 // prepare timeout
942 timeval tv;
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
946 #ifdef _WIN32
947 // selecting on an empty set of sockets doesn't cause a delay in win32.
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
950 #endif
952 // block
953 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
956 // handle errors.
957 // EAGAIN and EINTR don't matter because they're totally normal.
958 // ENOBUFS is hopefully transient.
959 // EBADF is kind of gross and might imply that something is wrong,
960 // but it happens sometimes...
961 if (sel < 0
962 && errno != EAGAIN && errno != EINTR
963 && errno != EBADF
964 && errno != ENOBUFS
967 seterr(errno);
969 #ifdef _WIN32
970 ::close(fakefd);
971 #endif
972 TRACE("select() returned %d\n", sel);
973 return sel;
977 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
979 // We cannot move the clock backward here, because timers that
980 // were expired in pre_select could then not be expired anymore,
981 // and while time going backward is rather unsettling in general,
982 // for it to be happening between pre_select and post_select is
983 // just outright insanity.
984 wvstime_sync_forward();
986 bool sure = post_select(si);
987 if (globalstream && forceable && (globalstream != this))
989 WvStream *s = globalstream;
990 globalstream = NULL; // prevent recursion
991 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
992 || si.global_sure;
993 globalstream = s;
995 return sure;
999 bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
1000 bool isexcept, bool forceable)
1002 // Detect use of deleted stream
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1005 SelectInfo si;
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1007 forceable);
1009 bool sure = false;
1010 int sel = _do_select(si);
1011 if (sel >= 0)
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream != this))
1014 globalstream->callback();
1016 return sure;
1020 IWvStream::SelectRequest WvStream::get_select_request()
1022 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
1026 void WvStream::force_select(bool readable, bool writable, bool isexception)
1028 if (readable)
1029 readcb = wv::bind(&WvStream::legacy_callback, this);
1030 if (writable)
1031 writecb = wv::bind(&WvStream::legacy_callback, this);
1032 if (isexception)
1033 exceptcb = wv::bind(&WvStream::legacy_callback, this);
1037 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
1039 if (readable)
1040 readcb = 0;
1041 if (writable)
1042 writecb = 0;
1043 if (isexception)
1044 exceptcb = 0;
1048 void WvStream::alarm(time_t msec_timeout)
1050 if (msec_timeout >= 0)
1051 alarm_time = msecadd(wvstime(), msec_timeout);
1052 else
1053 alarm_time = wvtime_zero;
1057 time_t WvStream::alarm_remaining()
1059 if (alarm_time.tv_sec)
1061 WvTime now = wvstime();
1063 // Time is going backward!
1064 if (now < last_alarm_check)
1066 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
1067 // warn only if it's a "big" difference (sigh...)
1068 if (msecdiff(last_alarm_check, now) > 200)
1069 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
1070 "(%ld:%ld %ld:%ld)\n",
1071 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1072 now.tv_sec, now.tv_usec);
1073 #endif
1074 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1077 last_alarm_check = now;
1079 time_t remaining = msecdiff(alarm_time, now);
1080 if (remaining < 0)
1081 remaining = 0;
1082 return remaining;
1084 return -1;
1088 bool WvStream::continue_select(time_t msec_timeout)
1090 assert(uses_continue_select);
1092 // if this assertion triggers, you probably tried to do continue_select()
1093 // while inside terminate_continue_select().
1094 assert(call_ctx);
1096 if (msec_timeout >= 0)
1097 alarm(msec_timeout);
1099 alarm(msec_timeout);
1100 WvCont::yield();
1101 alarm(-1); // cancel the still-pending alarm, or it might go off later!
1103 // when we get here, someone has jumped back into our task.
1104 // We have to select(0) here because it's possible that the alarm was
1105 // ticking _and_ data was available. This is aggravated especially if
1106 // msec_delay was zero. Note that running select() here isn't
1107 // inefficient, because if the alarm was expired then pre_select()
1108 // returned true anyway and short-circuited the previous select().
1109 TRACE("hello-%p\n", this);
1110 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
1114 void WvStream::terminate_continue_select()
1116 close();
1117 call_ctx = 0; // destroy the context, if necessary
1121 const WvAddr *WvStream::src() const
1123 return NULL;
1127 void WvStream::setcallback(IWvStreamCallback _callfunc)
1129 callfunc = _callfunc;
1130 call_ctx = 0; // delete any in-progress WvCont
1134 void WvStream::legacy_callback()
1136 execute();
1137 if (!!callfunc)
1138 callfunc();
1142 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
1144 IWvStreamCallback tmp = readcb;
1146 readcb = _callback;
1148 return tmp;
1152 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
1154 IWvStreamCallback tmp = writecb;
1156 writecb = _callback;
1158 return tmp;
1162 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
1164 IWvStreamCallback tmp = exceptcb;
1166 exceptcb = _callback;
1168 return tmp;
1172 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
1174 IWvStreamCallback tmp = closecb;
1175 if (isok())
1176 closecb = _callback;
1177 else
1179 // already closed? notify immediately!
1180 closecb = 0;
1181 if (!!_callback)
1182 _callback();
1184 return tmp;
1188 void WvStream::unread(WvBuf &unreadbuf, size_t count)
1190 WvDynBuf tmp;
1191 tmp.merge(unreadbuf, count);
1192 tmp.merge(inbuf);
1193 inbuf.zap();
1194 inbuf.merge(tmp);
1198 IWvStream *WvStream::find_by_wsid(WSID wsid)
1200 IWvStream *retval = NULL;
1202 if (wsid_map)
1204 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1206 if (it != wsid_map->end())
1207 retval = it->second;
1210 return retval;