2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
5 * Unified support for streams, that is, sequences of bytes that may or
6 * may not be ready for read/write at any given time.
8 * We provide typical read and write routines, as well as a select() function
12 #include <sys/types.h>
14 #define __WVSTREAM_UNIT_TEST 1
16 #include "wvtimeutils.h"
18 #include "wvstreamsdebugger.h"
19 #include "wvstrutils.h"
20 #include "wvistreamlist.h"
21 #include "wvlinkerhack.h"
22 #include "wvmoniker.h"
25 #define ENOBUFS WSAENOBUFS
27 #define errno GetLastError()
29 #include <sys/socket.h>
42 // enable this to add some read/write trace messages (this can be VERY
46 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
52 # define TRACE(x, y...)
58 WvStream
*WvStream::globalstream
= NULL
;
60 UUID_MAP_BEGIN(WvStream
)
61 UUID_MAP_ENTRY(IObject
)
62 UUID_MAP_ENTRY(IWvStream
)
66 static map
<WSID
, WvStream
*> *wsid_map
;
67 static WSID next_wsid_to_try
;
72 static IWvStream
*create_null(WvStringParm
, IObject
*)
74 return new WvStream();
77 static WvMoniker
<IWvStream
> reg("null", create_null
);
80 IWvStream
*IWvStream::create(WvStringParm moniker
, IObject
*obj
)
82 IWvStream
*s
= wvcreate
<IWvStream
>(moniker
, obj
);
86 s
->seterr_both(EINVAL
, "Unknown moniker '%s'", moniker
);
87 WVRELEASE(obj
); // we're not going to use it after all
93 static bool is_prefix_insensitive(const char *str
, const char *prefix
)
95 size_t len
= strlen(prefix
);
96 return strlen(str
) >= len
&& strncasecmp(str
, prefix
, len
) == 0;
100 static const char *strstr_insensitive(const char *haystack
, const char *needle
)
102 while (*haystack
!= '\0')
104 if (is_prefix_insensitive(haystack
, needle
))
112 static bool contains_insensitive(const char *haystack
, const char *needle
)
114 return strstr_insensitive(haystack
, needle
) != NULL
;
118 static const char *list_format
= "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119 static inline const char *Yes_No(bool val
)
121 return val
? "Yes": "No";
125 void WvStream::debugger_streams_display_header(WvStringParm cmd
,
126 WvStreamsDebugger::ResultCallback result_cb
)
129 result
.append(list_format
, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
130 "----------------Type", "-", "Name--------------------");
131 result_cb(cmd
, result
);
135 // Set to fit in 6 chars
136 static WvString
friendly_ms(time_t ms
)
139 return WvString("(%s)", ms
);
141 return WvString("%sms", ms
);
142 else if (ms
< 60*1000)
143 return WvString("%ss", ms
/1000);
144 else if (ms
< 60*60*1000)
145 return WvString("%sm", ms
/(60*1000));
146 else if (ms
<= 24*60*60*1000)
147 return WvString("%sh", ms
/(60*60*1000));
149 return WvString("%sd", ms
/(24*60*60*1000));
152 void WvStream::debugger_streams_display_one_stream(WvStream
*s
,
154 WvStreamsDebugger::ResultCallback result_cb
)
158 unsigned refcount
= s
->release();
159 result
.append(list_format
,
162 Yes_No(s
->isok()), " ",
163 Yes_No(s
->uses_continue_select
), " ",
164 friendly_ms(s
->alarm_remaining()), " ",
167 result_cb(cmd
, result
);
171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream
*s
,
173 const WvStringList
&args
,
174 WvStreamsDebugger::ResultCallback result_cb
)
176 bool show
= args
.isempty();
177 WvStringList::Iter
arg(args
);
178 for (arg
.rewind(); arg
.next(); )
181 bool is_num
= wvstring_to_num(*arg
, wsid
);
185 if (s
->wsid() == wsid
)
193 if (s
->wsname() && contains_insensitive(s
->wsname(), *arg
)
194 || s
->wstype() && contains_insensitive(s
->wstype(), *arg
))
202 debugger_streams_display_one_stream(s
, cmd
, result_cb
);
206 WvString
WvStream::debugger_streams_run_cb(WvStringParm cmd
,
208 WvStreamsDebugger::ResultCallback result_cb
, void *)
210 debugger_streams_display_header(cmd
, result_cb
);
213 map
<WSID
, WvStream
*>::iterator it
;
215 for (it
= wsid_map
->begin(); it
!= wsid_map
->end(); ++it
)
216 debugger_streams_maybe_display_one_stream(it
->second
, cmd
, args
,
220 return WvString::null
;
224 WvString
WvStream::debugger_close_run_cb(WvStringParm cmd
,
226 WvStreamsDebugger::ResultCallback result_cb
, void *)
229 return WvString("Usage: %s <WSID>", cmd
);
231 WvString wsid_str
= args
.popstr();
232 if (!wvstring_to_num(wsid_str
, wsid
))
233 return WvString("Invalid WSID '%s'", wsid_str
);
234 IWvStream
*s
= WvStream::find_by_wsid(wsid
);
236 return WvString("No such stream");
238 return WvString::null
;
242 void WvStream::add_debugger_commands()
244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb
, 0);
245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb
, 0);
249 WvStream::WvStream():
250 read_requires_writable(NULL
),
251 write_requires_readable(NULL
),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
258 readcb(wv::bind(&WvStream::legacy_callback
, this)),
260 outbuf_delayed_flush(false),
266 alarm_time(wvtime_zero
),
267 last_alarm_check(wvtime_zero
)
269 TRACE("Creating wvstream %p\n", this);
271 static bool first
= true;
275 WvStream::add_debugger_commands();
280 wsid_map
= new map
<WSID
, WvStream
*>;
281 WSID first_wsid_tried
= next_wsid_to_try
;
284 if (wsid_map
->find(next_wsid_to_try
) == wsid_map
->end())
287 } while (next_wsid_to_try
!= first_wsid_tried
);
288 my_wsid
= next_wsid_to_try
++;
289 bool inserted
= wsid_map
->insert(make_pair(my_wsid
, this)).second
;
294 int result
= WSAStartup(MAKEWORD(2,0), &wsaData
);
300 // FIXME: interfaces (IWvStream) shouldn't have implementations!
301 IWvStream::IWvStream()
306 IWvStream::~IWvStream()
311 WvStream::~WvStream()
313 TRACE("destroying %p\n", this);
316 // if this assertion fails, then uses_continue_select is true, but you
317 // didn't call terminate_continue_select() or close() before destroying
318 // your object. Shame on you!
319 assert(!uses_continue_select
|| !call_ctx
);
321 call_ctx
= 0; // finish running the suspended callback, if any
324 wsid_map
->erase(my_wsid
);
325 if (wsid_map
->empty())
331 // eventually, streams will auto-add themselves to the globallist. But
332 // even before then, it'll never be useful for them to be on the
333 // globallist *after* they get destroyed, so we might as well auto-remove
334 // them already. It's harmless for people to try to remove them twice.
335 WvIStreamList::globallist
.unlink(this);
337 TRACE("done destroying %p\n", this);
341 void WvStream::close()
343 TRACE("flushing in wvstream...\n");
344 flush(2000); // fixme: should not hardcode this stuff
345 TRACE("(flushed)\n");
351 IWvStreamCallback cb
= closecb
;
352 closecb
= 0; // ensure callback is only called once
356 // I would like to delete call_ctx here, but then if someone calls
357 // close() from *inside* a continuable callback, we explode. Oops!
358 //call_ctx = 0; // destroy the context, if necessary
362 void WvStream::autoforward(WvStream
&s
)
364 setcallback(wv::bind(autoforward_callback
, wv::ref(*this), wv::ref(s
)));
365 read_requires_writable
= &s
;
369 void WvStream::noautoforward()
372 read_requires_writable
= NULL
;
376 void WvStream::autoforward_callback(WvStream
&input
, WvStream
&output
)
381 len
= input
.read(buf
, sizeof(buf
));
382 output
.write(buf
, len
);
386 void WvStream::_callback()
394 void *WvStream::_callwrap(void *)
401 void WvStream::callback()
405 // if the alarm has gone off and we're calling callback... good!
406 if (alarm_remaining() == 0)
408 alarm_time
= wvtime_zero
;
409 alarm_was_ticking
= true;
412 alarm_was_ticking
= false;
414 assert(!uses_continue_select
|| personal_stack_size
>= 1024);
416 #define TEST_CONTINUES_HARSHLY 0
417 #if TEST_CONTINUES_HARSHLY
419 # warning "Using WvCont for *all* streams for testing!"
423 if (uses_continue_select
&& personal_stack_size
>= 1024)
426 if (!call_ctx
) // no context exists yet!
428 call_ctx
= WvCont(wv::bind(&WvStream::_callwrap
, this, _1
),
429 personal_stack_size
);
437 // if this assertion fails, a derived class's virtual execute() function
438 // didn't call its parent's execute() function, and we didn't make it
439 // all the way back up to WvStream::execute(). This doesn't always
440 // matter right now, but it could lead to obscure bugs later, so we'll
445 bool WvStream::isok() const
447 return !closed
&& WvErrorBase::isok();
451 void WvStream::seterr(int _errnum
)
453 if (!geterr()) // no pre-existing error
455 WvErrorBase::seterr(_errnum
);
461 size_t WvStream::read(WvBuf
&outbuf
, size_t count
)
463 // for now, just wrap the older read function
464 size_t free
= outbuf
.free();
469 unsigned char *buf
= tmp
.alloc(count
);
470 size_t len
= read(buf
, count
);
471 tmp
.unalloc(count
- len
);
477 size_t WvStream::write(WvBuf
&inbuf
, size_t count
)
479 // for now, just wrap the older write function
480 size_t avail
= inbuf
.used();
483 const unsigned char *buf
= inbuf
.get(count
);
484 size_t len
= write(buf
, count
);
485 inbuf
.unget(count
- len
);
490 size_t WvStream::read(void *buf
, size_t count
)
492 assert(!count
|| buf
);
495 unsigned char *newbuf
;
498 if (bufu
< queue_min
)
500 newbuf
= inbuf
.alloc(queue_min
- bufu
);
502 i
= uread(newbuf
, queue_min
- bufu
);
503 inbuf
.unalloc(queue_min
- bufu
- i
);
508 if (bufu
< queue_min
)
514 // if buffer is empty, do a hard read
516 bufu
= uread(buf
, count
);
519 // otherwise just read from the buffer
523 memcpy(buf
, inbuf
.get(bufu
), bufu
);
526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu
, count
);
532 size_t WvStream::write(const void *buf
, size_t count
)
534 assert(!count
|| buf
);
535 if (!isok() || !buf
|| !count
|| stop_write
) return 0;
538 if (!outbuf_delayed_flush
&& !outbuf
.used())
540 wrote
= uwrite(buf
, count
);
542 buf
= (const unsigned char *)buf
+ wrote
;
543 // if (!count) return wrote; // short circuit if no buffering needed
545 if (max_outbuf_size
!= 0)
547 size_t canbuffer
= max_outbuf_size
- outbuf
.used();
548 if (count
> canbuffer
)
549 count
= canbuffer
; // can't write the whole amount
553 outbuf
.put(buf
, count
);
569 void WvStream::noread()
576 void WvStream::nowrite()
583 void WvStream::maybe_autoclose()
585 if (stop_read
&& stop_write
&& !outbuf
.used() && !inbuf
.used() && !closed
)
590 bool WvStream::isreadable()
592 return isok() && select(0, true, false, false);
596 bool WvStream::iswritable()
598 return !stop_write
&& isok() && select(0, false, true, false);
602 char *WvStream::blocking_getline(time_t wait_msec
, int separator
,
605 assert(separator
>= 0);
606 assert(separator
<= 255);
608 //assert(uses_continue_select || wait_msec == 0);
610 WvTime
timeout_time(0);
612 timeout_time
= msecadd(wvtime(), wait_msec
);
616 // if we get here, we either want to wait a bit or there is data
620 // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
623 // if there is a newline already, we have enough data.
624 if (inbuf
.strchr(separator
) > 0)
626 else if (!isok() || stop_read
) // uh oh, stream is in trouble.
629 // make select not return true until more data is available
630 queuemin(inbuf
.used() + 1);
632 // compute remaining timeout
635 wait_msec
= msecdiff(timeout_time
, wvtime());
640 // FIXME: this is blocking_getline. It shouldn't
641 // call continue_select()!
643 if (wait_msec
!= 0 && uses_continue_select
)
644 hasdata
= continue_select(wait_msec
);
646 hasdata
= select(wait_msec
, true, false);
655 unsigned char *buf
= tmp
.alloc(readahead
);
657 size_t len
= uread(buf
, readahead
);
658 tmp
.unalloc(readahead
- len
);
659 inbuf
.put(tmp
.get(len
), len
);
660 hasdata
= len
> 0; // enough?
666 if (!hasdata
&& wait_msec
== 0)
667 return NULL
; // handle timeout
672 // return the appropriate data
674 i
= inbuf
.strchr(separator
);
676 char *eol
= (char *)inbuf
.mutablepeek(i
- 1, 1);
677 assert(eol
&& *eol
== separator
);
679 return const_cast<char*>((const char *)inbuf
.get(i
));
681 // handle "EOF without newline" condition
682 // FIXME: it's very silly that buffers can't return editable
684 inbuf
.alloc(1)[0] = 0; // null-terminate it
685 return const_cast<char *>((const char *)inbuf
.get(inbuf
.used()));
690 char *WvStream::continue_getline(time_t wait_msec
, int separator
,
693 assert(false && "not implemented, come back later!");
694 assert(uses_continue_select
);
699 void WvStream::drain()
703 read(buf
, sizeof(buf
));
707 bool WvStream::flush(time_t msec_timeout
)
709 if (is_flushing
) return false;
711 TRACE("%p flush starts\n", this);
714 want_to_flush
= true;
715 bool done
= flush_internal(msec_timeout
) // any other internal buffers
716 && flush_outbuf(msec_timeout
); // our own outbuf
719 TRACE("flush stops (%d)\n", done
);
724 bool WvStream::should_flush()
726 return want_to_flush
;
730 bool WvStream::flush_outbuf(time_t msec_timeout
)
732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
733 bool outbuf_was_used
= outbuf
.used();
735 // do-nothing shortcut for speed
736 // FIXME: definitely makes a "measurable" difference...
737 // but is it worth the risk?
738 if (!outbuf_was_used
&& !autoclose_time
&& !outbuf_delayed_flush
)
744 WvTime stoptime
= msecadd(wvtime(), msec_timeout
);
747 while (outbuf_was_used
&& isok())
749 // fprintf(stderr, "%p: fd:%d/%d, used:%d\n",
750 // this, getrfd(), getwfd(), outbuf.used());
752 size_t attempt
= outbuf
.optgettable();
753 size_t real
= uwrite(outbuf
.get(attempt
), attempt
);
755 // WARNING: uwrite() may have messed up our outbuf!
756 // This probably only happens if uwrite() closed the stream because
757 // of an error, so we'll check isok().
758 if (isok() && real
< attempt
)
760 TRACE("flush_outbuf: unget %d-%d\n", attempt
, real
);
761 assert(outbuf
.ungettable() >= attempt
- real
);
762 outbuf
.unget(attempt
- real
);
765 // since post_select() can call us, and select() calls post_select(),
766 // we need to be careful not to call select() if we don't need to!
767 // post_select() will only call us with msec_timeout==0, and we don't
768 // need to do select() in that case anyway.
771 if (msec_timeout
>= 0
772 && (stoptime
< wvtime() || !select(msec_timeout
, false, true)))
775 outbuf_was_used
= outbuf
.used();
779 if (autoclose_time
&& isok())
781 time_t now
= time(NULL
);
782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now
- autoclose_time
, outbuf
.used());
784 if ((flush_internal(0) && !outbuf
.used()) || now
> autoclose_time
)
786 autoclose_time
= 0; // avoid infinite recursion!
791 TRACE("flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush
&& !outbuf_was_used
)
793 want_to_flush
= false;
795 TRACE("flush_outbuf: now isok=%d\n", isok());
797 // if we can't flush the outbuf, at least empty it!
798 if (outbuf_was_used
&& !isok())
802 TRACE("flush_outbuf stops\n");
804 return !outbuf_was_used
;
808 bool WvStream::flush_internal(time_t msec_timeout
)
810 // once outbuf emptied, that's it for most streams
815 int WvStream::getrfd() const
821 int WvStream::getwfd() const
827 void WvStream::flush_then_close(int msec_timeout
)
829 time_t now
= time(NULL
);
830 autoclose_time
= now
+ (msec_timeout
+ 999) / 1000;
832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf
.used(), autoclose_time
- now
);
835 // as a fast track, we _could_ close here: but that's not a good idea,
836 // since flush_then_close() deals with obscure situations, and we don't
837 // want the caller to use it incorrectly. So we make things _always_
838 // break when the caller forgets to call select() later.
844 void WvStream::pre_select(SelectInfo
&si
)
848 time_t alarmleft
= alarm_remaining();
850 if (!isok() || (!si
.inherit_request
&& alarmleft
== 0))
853 return; // alarm has rung
856 if (!si
.inherit_request
)
858 si
.wants
.readable
|= static_cast<bool>(readcb
);
859 si
.wants
.writable
|= static_cast<bool>(writecb
);
860 si
.wants
.isexception
|= static_cast<bool>(exceptcb
);
863 // handle read-ahead buffering
864 if (si
.wants
.readable
&& inbuf
.used() && inbuf
.used() >= queue_min
)
866 si
.msec_timeout
= 0; // already ready
870 && (alarmleft
< si
.msec_timeout
|| si
.msec_timeout
< 0))
871 si
.msec_timeout
= alarmleft
+ 10;
875 bool WvStream::post_select(SelectInfo
&si
)
877 if (!si
.inherit_request
)
879 si
.wants
.readable
|= static_cast<bool>(readcb
);
880 si
.wants
.writable
|= static_cast<bool>(writecb
);
881 si
.wants
.isexception
|= static_cast<bool>(exceptcb
);
884 // FIXME: need sane buffer flush support for non FD-based streams
885 // FIXME: need read_requires_writable and write_requires_readable
886 // support for non FD-based streams
888 // note: flush(nonzero) might call select(), but flush(0) never does,
892 if (!si
.inherit_request
&& alarm_remaining() == 0)
893 return true; // alarm ticked
894 if ((si
.wants
.readable
|| (!si
.inherit_request
&& readcb
))
895 && inbuf
.used() && inbuf
.used() >= queue_min
)
896 return true; // already ready
901 void WvStream::_build_selectinfo(SelectInfo
&si
, time_t msec_timeout
,
902 bool readable
, bool writable
, bool isexcept
, bool forceable
)
910 si
.wants
.readable
= readcb
;
911 si
.wants
.writable
= writecb
;
912 si
.wants
.isexception
= exceptcb
;
916 si
.wants
.readable
= readable
;
917 si
.wants
.writable
= writable
;
918 si
.wants
.isexception
= isexcept
;
922 si
.msec_timeout
= msec_timeout
;
923 si
.inherit_request
= ! forceable
;
924 si
.global_sure
= false;
929 if (globalstream
&& forceable
&& (globalstream
!= this))
931 WvStream
*s
= globalstream
;
932 globalstream
= NULL
; // prevent recursion
933 s
->xpre_select(si
, SelectRequest(false, false, false));
939 int WvStream::_do_select(SelectInfo
&si
)
943 tv
.tv_sec
= si
.msec_timeout
/ 1000;
944 tv
.tv_usec
= (si
.msec_timeout
% 1000) * 1000;
947 // selecting on an empty set of sockets doesn't cause a delay in win32.
948 SOCKET fakefd
= socket(PF_INET
, SOCK_STREAM
, 0);
949 FD_SET(fakefd
, &si
.except
);
953 int sel
= ::select(si
.max_fd
+1, &si
.read
, &si
.write
, &si
.except
,
954 si
.msec_timeout
>= 0 ? &tv
: (timeval
*)NULL
);
957 // EAGAIN and EINTR don't matter because they're totally normal.
958 // ENOBUFS is hopefully transient.
959 // EBADF is kind of gross and might imply that something is wrong,
960 // but it happens sometimes...
962 && errno
!= EAGAIN
&& errno
!= EINTR
972 TRACE("select() returned %d\n", sel
);
977 bool WvStream::_process_selectinfo(SelectInfo
&si
, bool forceable
)
979 // We cannot move the clock backward here, because timers that
980 // were expired in pre_select could then not be expired anymore,
981 // and while time going backward is rather unsettling in general,
982 // for it to be happening between pre_select and post_select is
983 // just outright insanity.
984 wvstime_sync_forward();
986 bool sure
= post_select(si
);
987 if (globalstream
&& forceable
&& (globalstream
!= this))
989 WvStream
*s
= globalstream
;
990 globalstream
= NULL
; // prevent recursion
991 si
.global_sure
= s
->xpost_select(si
, SelectRequest(false, false, false))
999 bool WvStream::_select(time_t msec_timeout
, bool readable
, bool writable
,
1000 bool isexcept
, bool forceable
)
1002 // Detect use of deleted stream
1003 assert(wsid_map
&& (wsid_map
->find(my_wsid
) != wsid_map
->end()));
1006 _build_selectinfo(si
, msec_timeout
, readable
, writable
, isexcept
,
1010 int sel
= _do_select(si
);
1012 sure
= _process_selectinfo(si
, forceable
);
1013 if (si
.global_sure
&& globalstream
&& forceable
&& (globalstream
!= this))
1014 globalstream
->callback();
1020 IWvStream::SelectRequest
WvStream::get_select_request()
1022 return IWvStream::SelectRequest(readcb
, writecb
, exceptcb
);
1026 void WvStream::force_select(bool readable
, bool writable
, bool isexception
)
1029 readcb
= wv::bind(&WvStream::legacy_callback
, this);
1031 writecb
= wv::bind(&WvStream::legacy_callback
, this);
1033 exceptcb
= wv::bind(&WvStream::legacy_callback
, this);
1037 void WvStream::undo_force_select(bool readable
, bool writable
, bool isexception
)
1048 void WvStream::alarm(time_t msec_timeout
)
1050 if (msec_timeout
>= 0)
1051 alarm_time
= msecadd(wvstime(), msec_timeout
);
1053 alarm_time
= wvtime_zero
;
1057 time_t WvStream::alarm_remaining()
1059 if (alarm_time
.tv_sec
)
1061 WvTime now
= wvstime();
1063 // Time is going backward!
1064 if (now
< last_alarm_check
)
1066 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
1067 // warn only if it's a "big" difference (sigh...)
1068 if (msecdiff(last_alarm_check
, now
) > 200)
1069 fprintf(stderr
, " ************* TIME WENT BACKWARDS! "
1070 "(%ld:%ld %ld:%ld)\n",
1071 last_alarm_check
.tv_sec
, last_alarm_check
.tv_usec
,
1072 now
.tv_sec
, now
.tv_usec
);
1074 alarm_time
= tvdiff(alarm_time
, tvdiff(last_alarm_check
, now
));
1077 last_alarm_check
= now
;
1079 time_t remaining
= msecdiff(alarm_time
, now
);
1088 bool WvStream::continue_select(time_t msec_timeout
)
1090 assert(uses_continue_select
);
1092 // if this assertion triggers, you probably tried to do continue_select()
1093 // while inside terminate_continue_select().
1096 if (msec_timeout
>= 0)
1097 alarm(msec_timeout
);
1099 alarm(msec_timeout
);
1101 alarm(-1); // cancel the still-pending alarm, or it might go off later!
1103 // when we get here, someone has jumped back into our task.
1104 // We have to select(0) here because it's possible that the alarm was
1105 // ticking _and_ data was available. This is aggravated especially if
1106 // msec_delay was zero. Note that running select() here isn't
1107 // inefficient, because if the alarm was expired then pre_select()
1108 // returned true anyway and short-circuited the previous select().
1109 TRACE("hello-%p\n", this);
1110 return !alarm_was_ticking
|| select(0, readcb
, writecb
, exceptcb
);
1114 void WvStream::terminate_continue_select()
1117 call_ctx
= 0; // destroy the context, if necessary
1121 const WvAddr
*WvStream::src() const
1127 void WvStream::setcallback(IWvStreamCallback _callfunc
)
1129 callfunc
= _callfunc
;
1130 call_ctx
= 0; // delete any in-progress WvCont
1134 void WvStream::legacy_callback()
1142 IWvStreamCallback
WvStream::setreadcallback(IWvStreamCallback _callback
)
1144 IWvStreamCallback tmp
= readcb
;
1152 IWvStreamCallback
WvStream::setwritecallback(IWvStreamCallback _callback
)
1154 IWvStreamCallback tmp
= writecb
;
1156 writecb
= _callback
;
1162 IWvStreamCallback
WvStream::setexceptcallback(IWvStreamCallback _callback
)
1164 IWvStreamCallback tmp
= exceptcb
;
1166 exceptcb
= _callback
;
1172 IWvStreamCallback
WvStream::setclosecallback(IWvStreamCallback _callback
)
1174 IWvStreamCallback tmp
= closecb
;
1176 closecb
= _callback
;
1179 // already closed? notify immediately!
1188 void WvStream::unread(WvBuf
&unreadbuf
, size_t count
)
1191 tmp
.merge(unreadbuf
, count
);
1198 IWvStream
*WvStream::find_by_wsid(WSID wsid
)
1200 IWvStream
*retval
= NULL
;
1204 map
<WSID
, WvStream
*>::iterator it
= wsid_map
->find(wsid
);
1206 if (it
!= wsid_map
->end())
1207 retval
= it
->second
;