1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Indent-tabs-mode: NIL -*-
3 ;;; common.lisp --- Miscellaneous definitions.
5 ;;; Copyright (C) 2006-2007, Stelian Ionescu <sionescu@common-lisp.net>
7 ;;; This code is free software; you can redistribute it and/or
8 ;;; modify it under the terms of the version 2.1 of
9 ;;; the GNU Lesser General Public License as published by
10 ;;; the Free Software Foundation, as clarified by the
11 ;;; preamble found here:
12 ;;; http://opensource.franz.com/preamble.html
14 ;;; This program is distributed in the hope that it will be useful,
15 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ;;; GNU General Public License for more details.
19 ;;; You should have received a copy of the GNU Lesser General
20 ;;; Public License along with this library; if not, write to the
21 ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 ;;; Boston, MA 02110-1301, USA
24 (in-package :io.multiplex
)
26 (eval-when (:compile-toplevel
:load-toplevel
:execute
)
27 (defvar *available-multiplexers
* nil
)
28 (defvar *default-multiplexer
* nil
))
30 (defvar *maximum-event-loop-timeout
* 1)
34 (defclass event-base
()
35 ((mux :initform
(make-instance *default-multiplexer
*)
36 :initarg
:mux
:reader mux-of
)
37 (fds :initform
(make-hash-table :test
'eql
)
39 (timeouts :initform
(make-queue)
43 (exit-when-empty :initarg
:exit-when-empty
44 :accessor exit-when-empty-p
))
45 (:default-initargs
:exit-when-empty nil
)
46 (:documentation
"An event base ..."))
48 (defmacro with-event-base
((var &rest initargs
) &body body
)
49 "Binds VAR to a new EVENT-BASE, instantiated with INITARGS,
50 within the extent of BODY. Closes VAR."
51 `(let ((,var
(make-instance 'event-base
,@initargs
)))
56 (defmethod print-object ((base event-base
) stream
)
57 (print-unreadable-object (base stream
:type nil
:identity t
)
58 (format stream
"event base, ~A FDs monitored, using: ~A"
59 ;; kludge: quick fix for printing closed event bases
60 (when (fds-of base
) (hash-table-count (fds-of base
)))
63 (defmethod initialize-instance :after
((base event-base
) &key
)
64 (with-slots (mux) base
66 (setq mux
(make-instance mux
)))))
68 ;;; KLUDGE: CLOSE is for streams. --luis
70 ;;; Also, we might want to close FDs here. Or have a version/argument
71 ;;; that handles that. Or... add finalizers to the fd streams.
72 (defmethod close ((event-base event-base
) &key abort
)
73 (declare (ignore abort
))
74 (with-accessors ((mux mux-of
)) event-base
75 (close-multiplexer mux
)
76 (dolist (slot '(fds timeouts exit
))
77 (setf (slot-value event-base slot
) nil
))
80 (defgeneric add-fd
(base fd event-type function
&key timeout one-shot
)
83 (defgeneric add-timeout
(event-base function timeout
&key persistent
)
86 (defgeneric remove-event
(event-base event
)
89 (defgeneric remove-events
(event-base event-list
)
92 (defgeneric event-dispatch
(event-base &key
&allow-other-keys
)
95 (defgeneric exit-event-loop
(event-base &key delay
)
97 (:method
((event-base event-base
) &key
(delay 0))
98 (add-timeout event-base
99 #'(lambda (fd event-type
)
100 (declare (ignore fd event-type
))
101 (setf (exit-p event-base
) t
))
102 delay
:persistent nil
)))
104 (defgeneric event-base-empty-p
(event-base)
106 (:method
((event-base event-base
))
107 (and (zerop (hash-table-count (fds-of event-base
)))
108 (queue-empty-p (timeouts-of event-base
)))))
110 (defgeneric fd-entry-of
(event-base fd
)
112 (:method
((event-base event-base
) fd
)
113 (gethash fd
(fds-of event-base
))))
115 (defun %add-event
(event-base event
&optional fd-entry
)
116 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
117 (when (event-timeout event
)
118 ;; add the event to the timeout queue
119 (queue-sorted-insert timeouts event
#'< #'event-abs-timeout
))
120 (let ((fd (event-fd event
)))
121 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
122 ;; if there's no such fd-entry, create it
124 (fd-entry-add-event fd-entry event
)
125 (setf (gethash fd fds
) fd-entry
))
128 (defun %remove-event
(event-base event
)
129 (with-accessors ((fds fds-of
) (timeouts timeouts-of
)) event-base
130 (when (event-timeout event
)
131 ;; remove the event from the timeout queue
132 (queue-delete timeouts event
))
133 (let ((fd (event-fd event
)))
134 ;; if it's an FD event remove it from its fd-entry
135 ;; if the fd-emtry is then empty, remove it
137 (let ((fd-entry (gethash fd fds
)))
139 (fd-entry-del-event fd-entry event
)
140 (when (fd-entry-empty-p fd-entry
)
144 (defun calc-possible-edge-change-when-adding (fd-entry event-type
)
145 (cond ((and (eql event-type
:read
)
146 (queue-empty-p (fd-entry-read-events fd-entry
)))
148 ((and (eql event-type
:write
)
149 (queue-empty-p (fd-entry-write-events fd-entry
)))
152 (defmethod add-fd ((event-base event-base
) fd event-type function
153 &key timeout one-shot
)
154 (check-type fd unsigned-byte
)
155 (check-type event-type fd-event
)
156 (let ((fd-limit (fd-limit-of (mux-of event-base
))))
157 (when (and fd-limit
(> fd fd-limit
))
158 (error "Cannot add such a large FD: ~A" fd
)))
159 (let ((current-entry (fd-entry-of event-base fd
))
160 (event (make-event fd event-type function
(not one-shot
)
161 (abs-timeout timeout
)
162 (normalize-timeout timeout
)))
166 (setf edge-change
(calc-possible-edge-change-when-adding
167 current-entry event-type
))
168 (%add-event event-base event current-entry
)
170 (setf (fd-entry-edge-change current-entry
) edge-change
)
171 (update-fd (mux-of event-base
) current-entry
)
172 (setf (fd-entry-edge-change current-entry
) nil
)))
174 (setf current-entry
(make-fd-entry fd
))
175 (%add-event event-base event current-entry
)
176 (unless (monitor-fd (mux-of event-base
) current-entry
)
177 (%remove-event event-base event
))))
180 (defmethod add-timeout ((event-base event-base
) function timeout
183 (%add-event event-base
184 (make-event nil
:timeout function persistent
185 (abs-timeout timeout
) (normalize-timeout timeout
))))
187 (defun calc-possible-edge-change-when-removing (fd-entry event-type
)
188 (cond ((and (eql event-type
:read
)
189 (not (queue-empty-p (fd-entry-read-events fd-entry
))))
191 ((and (eql event-type
:write
)
192 (not (queue-empty-p (fd-entry-write-events fd-entry
))))
195 (defmethod remove-event ((event-base event-base
) event
)
196 (check-type (event-type event
) event-type
)
197 (let* ((fd (event-fd event
))
198 (current-entry (fd-entry-of event-base fd
))
202 (setf edge-change
(calc-possible-edge-change-when-removing
203 current-entry
(event-type event
)))
204 (%remove-event event-base event
)
205 (if (fd-entry-empty-p current-entry
)
206 (unmonitor-fd (mux-of event-base
) current-entry
)
208 (setf (fd-entry-edge-change current-entry
) edge-change
)
209 (update-fd (mux-of event-base
) current-entry
)
210 (setf (fd-entry-edge-change current-entry
) nil
))))
211 (%remove-event event-base event
)))
214 (defmacro with-fd-handler
((event-base fd event-type function
&optional timeout
)
217 (once-only (event-base)
218 (with-unique-names (event)
222 (setf ,event
(add-fd ,event-base
,fd
,event-type
,function
226 (remove-event ,event-base
,event
)))))))
228 (defmethod event-dispatch :around
((event-base event-base
)
229 &key timeout one-shot
)
230 (setf (exit-p event-base
) nil
)
232 (exit-event-loop event-base
:delay timeout
))
233 (call-next-method event-base
:one-shot one-shot
))
237 (defun recalculate-timeouts (timeouts)
238 (let ((now (osicat:get-monotonic-time
)))
239 (dolist (ev (queue-head timeouts
))
240 (event-recalc-abs-timeout ev now
))))
242 (defun dispatch-timeouts (dispatch-list)
243 (dolist (ev dispatch-list
)
244 (funcall (event-handler ev
) nil
:timeout
)))
246 (defmethod remove-events ((event-base event-base
) event-list
)
247 (dolist (ev event-list
)
248 (remove-event event-base ev
)))
250 (defmethod event-dispatch ((event-base event-base
) &key one-shot
)
251 (with-accessors ((mux mux-of
) (fds fds-of
)
252 (exit-p exit-p
) (exit-when-empty exit-when-empty-p
)
253 (timeouts timeouts-of
)) event-base
254 (flet ((recalc-poll-timeout ()
255 (calc-min-timeout (events-calc-min-rel-timeout timeouts
)
256 *maximum-event-loop-timeout
*)))
257 (do ((poll-timeout (recalc-poll-timeout) (recalc-poll-timeout))
258 (deletion-list () ())
259 (dispatch-list () ()))
260 ((or exit-p
(and exit-when-empty
(event-base-empty-p event-base
))))
261 ;; this seemed completely broken:
262 #-
(and) (recalculate-timeouts)
263 ;; ONE-SHOT used to mean that once an /FD event/ was
264 ;; dispatched the loop would exit. I'm changing that to exit
265 ;; for timeout events as well. Bad idea?
267 ;; something is (SETFing (EXIT-P EVENT-BAST) NIL) and that is
268 ;; causing the events to actually be dispatched twice. Why?
269 (when (and (dispatch-fd-events-once event-base poll-timeout
) one-shot
)
271 (setf (values deletion-list dispatch-list
)
272 (filter-expired-events
273 (expired-events timeouts
(osicat:get-monotonic-time
))))
274 (when (and dispatch-list one-shot
)
276 (dispatch-timeouts dispatch-list
)
277 (remove-events event-base deletion-list
)
278 (queue-sort timeouts
#'< #'event-abs-timeout
)))))
280 ;;; Waits for events and dispatches them. Returns T if some events
281 ;;; have been received, NIL otherwise.
282 (defun dispatch-fd-events-once (event-base timeout
)
283 (with-accessors ((mux mux-of
) (fds fds-of
) (timeouts timeouts-of
))
285 (let ((deletion-list ())
286 (fd-events (harvest-events mux timeout
)))
287 (dolist (ev fd-events
)
288 (destructuring-bind (fd ev-types
) ev
289 (let ((fd-entry (fd-entry-of event-base fd
)))
291 (let ((errorp (member :error ev-types
)))
293 (dispatch-error-events fd-entry
)
294 (nconcf deletion-list
295 (fd-entry-all-events fd-entry
)))
296 (when (member :read ev-types
)
297 (dispatch-read-events fd-entry
)
299 (nconcf deletion-list
300 (fd-entry-one-shot-events fd-entry
:read
))))
301 (when (member :write ev-types
)
302 (dispatch-write-events fd-entry
)
304 (nconcf deletion-list
305 (fd-entry-one-shot-events fd-entry
:write
)))))
306 (warn "Got spurious event for non-monitored FD: ~A" fd
)))))
307 (dolist (ev deletion-list
)
308 (remove-event event-base ev
))
311 (defun expired-events (queue now
)
313 #'(lambda (to) (and to
(<= to now
)))
314 #'event-abs-timeout
))
316 (defun filter-expired-events (events)
317 (let ((deletion-list ())
320 (push ev dispatch-list
)
321 (unless (event-persistent-p ev
)
322 (push ev deletion-list
)))
323 (values deletion-list dispatch-list
)))
325 (defun events-calc-min-rel-timeout (timeouts)
326 (let* ((now (osicat:get-monotonic-time
))
327 (first-valid-event (find-if #'(lambda (to)
328 (or (null to
) (< now to
)))
329 (queue-head timeouts
)
330 :key
#'event-abs-timeout
)))
331 (when (and first-valid-event
332 (event-abs-timeout first-valid-event
))
333 (- (event-abs-timeout first-valid-event
) now
))))
335 (defun dispatch-error-events (fd-entry)
336 (dolist (ev (queue-head (fd-entry-error-events fd-entry
)))
337 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :error
)))
339 (defun dispatch-read-events (fd-entry)
340 (dolist (ev (queue-head (fd-entry-read-events fd-entry
)))
341 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :read
)))
343 (defun dispatch-write-events (fd-entry)
344 (dolist (ev (queue-head (fd-entry-write-events fd-entry
)))
345 (funcall (event-handler ev
) (fd-entry-fd fd-entry
) :write
)))
350 '(member :read
:write
:error
))
352 (deftype event-type
()
353 '(or fd-event
(member :timeout
)))
355 (defstruct (fd-entry (:constructor make-fd-entry
(fd))
357 (fd 0 :type unsigned-byte
)
358 (edge-change nil
:type symbol
)
359 (read-events (make-queue) :type queue
)
360 (write-events (make-queue) :type queue
)
361 (error-events (make-queue) :type queue
))
363 (defun fd-entry-event-list (fd-entry event-type
)
364 (check-type fd-entry fd-entry
)
365 (check-type event-type fd-event
)
367 (:read
(fd-entry-read-events fd-entry
))
368 (:write
(fd-entry-write-events fd-entry
))
369 (:error
(fd-entry-error-events fd-entry
))))
371 (defun (setf fd-entry-event-list
) (fd-entry event-list event-type
)
372 (check-type fd-entry fd-entry
)
373 (check-type event-type fd-event
)
375 (:read
(setf (fd-entry-read-events fd-entry
) event-list
))
376 (:write
(setf (fd-entry-write-events fd-entry
) event-list
))
377 (:error
(setf (fd-entry-error-events fd-entry
) event-list
))))
379 (defun fd-entry-empty-p (fd-entry)
380 (and (queue-empty-p (fd-entry-read-events fd-entry
))
381 (queue-empty-p (fd-entry-write-events fd-entry
))
382 (queue-empty-p (fd-entry-error-events fd-entry
))))
384 (defun fd-entry-add-event (fd-entry event
)
385 (queue-enqueue (fd-entry-event-list fd-entry
(event-type event
))
388 (defun fd-entry-del-event (fd-entry event
)
389 (queue-delete (fd-entry-event-list fd-entry
(event-type event
))
392 (defun fd-entry-all-events (fd-entry)
393 (append (queue-head (fd-entry-read-events fd-entry
))
394 (queue-head (fd-entry-write-events fd-entry
))
395 (queue-head (fd-entry-error-events fd-entry
))))
397 (defun fd-entry-one-shot-events (fd-entry event-type
)
398 (remove-if #'event-persistent-p
399 (queue-head (fd-entry-event-list fd-entry event-type
))))
403 (defstruct (event (:constructor make-event
(fd type handler persistent-p
404 abs-timeout timeout
))
406 ;; a file descriptor or nil in case of a timeout
407 (fd nil
:type
(or null unsigned-byte
))
408 (type nil
:type
(or null event-type
))
409 (handler nil
:type
(or null function
))
410 ;; if an event is not persistent it is removed
411 ;; after it occurs or if it times out
412 (persistent-p nil
:type boolean
)
413 (abs-timeout nil
:type
(or null timeout
))
414 (timeout nil
:type
(or null timeout
)))
416 (defun event-timed-out-p (event timeout
)
417 (let ((ev-to (event-abs-timeout event
)))
418 (when (and ev-to timeout
)
423 (defun event-recalc-abs-timeout (event now
)
424 (setf (event-abs-timeout event
)
425 (+ now
(event-timeout event
))))
430 (defcfun ("_getmaxstdio" get-fd-limit
) :int
)
433 (defun get-fd-limit ()
434 (let ((fd-limit (nix:getrlimit nix
::rlimit-nofile
)))
435 (unless (eql fd-limit nix
::rlim-infinity
)
438 (defclass multiplexer
()
440 (fd-limit :initform
(get-fd-limit)
443 (closedp :accessor multiplexer-closedp
447 (defgeneric monitor-fd
(mux fd-entry
)
448 (:method
((mux multiplexer
) fd-entry
)
449 (declare (ignore fd-entry
))
452 (defgeneric update-fd
(mux fd-entry
)
453 (:method
((mux multiplexer
) fd-entry
)
454 (declare (ignore fd-entry
))
457 (defgeneric unmonitor-fd
(mux fd-entry
)
458 (:method
((mux multiplexer
) fd-entry
)
459 (declare (ignore fd-entry
))
462 ;;; Returns a list of fd/result pairs which have one of these forms:
465 ;;; (fd (:read :write))
467 (defgeneric harvest-events
(mux timeout
))
469 (defgeneric close-multiplexer
(mux)
470 (:method-combination progn
:most-specific-last
)
471 (:method
:around
((mux multiplexer
))
472 (unless (multiplexer-closedp mux
)
474 (setf (multiplexer-closedp mux
) t
)))
475 (:method progn
((mux multiplexer
))
476 (when (and (slot-boundp mux
'fd
) (not (null (fd-of mux
))))
477 (nix:close
(fd-of mux
))
478 (setf (slot-value mux
'fd
) nil
))
481 (defmethod monitor-fd :around
((mux multiplexer
) fd-entry
)
482 (if (ignore-and-print-errors (call-next-method))
484 (warn "FD monitoring failed for FD ~A."
485 (fd-entry-fd fd-entry
))))
487 (defmethod update-fd :around
((mux multiplexer
) fd-entry
)
488 (if (ignore-and-print-errors (call-next-method))
490 (warn "FD status update failed for FD ~A."
491 (fd-entry-fd fd-entry
))))
493 (defmethod unmonitor-fd :around
((mux multiplexer
) fd-entry
)
494 (if (ignore-and-print-errors (call-next-method))
496 (warn "FD unmonitoring failed for FD ~A."
497 (fd-entry-fd fd-entry
))))
499 (defmacro define-multiplexer
(name priority superclasses slots
&rest options
)
501 (defclass ,name
,superclasses
,slots
,@options
)
502 (pushnew (cons ,priority
',name
) *available-multiplexers
*
507 ;;; FIXME: Until a way to autodetect platform features is implemented
509 (defconstant nix
::pollrdhup
0)
511 (define-condition poll-error
(error)
512 ((fd :initarg
:fd
:reader poll-error-fd
)
513 (identifier :initarg
:identifier
:initform
"<Unknown error>"
514 :reader poll-error-identifier
))
515 (:report
(lambda (condition stream
)
516 (format stream
"Error caught while polling file descriptor ~A: ~A"
517 (poll-error-fd condition
)
518 (poll-error-identifier condition
))))
520 "Signaled when an error occurs while polling for I/O readiness
521 of a file descriptor."))
523 ;;; This should probably be moved elsewhere. Also, it's quite a mess.
526 (load-foreign-library "User32.dll")
527 (load-foreign-library "msvcrt.dll")
528 (load-foreign-library "Ws2_32.dll")
530 (defctype dword
:unsigned-long
)
531 (defctype bool
(:boolean
:int
))
533 (osicat-posix::defsyscall
"get_osfhandle" :long
536 (defconstant +wait-failed
+ #xffffffff
)
537 (defconstant +wait-abandoned
+ #x80
)
538 (defconstant +wait-object-0
+ 0)
539 (defconstant +wait-timeout
+ #x102
)
540 (defconstant +true
+ 1)
541 (defconstant +fd-read
+ 1)
542 (defconstant +fd-write
+ 2)
543 (defconstant +socket-error
+ -
1)
544 (defconstant +wsaenotsock
+ 10038)
546 (defcfun ("MsgWaitForMultipleObjects" %wait
:cconv
:stdcall
) dword
552 (defcfun ("WSAGetLastError" wsa-get-last-error
:cconv
:stdcall
) :int
)
554 (defcfun ("WSAEventSelect" wsa-event-select
:cconv
:stdcall
) :int
559 (defcfun ("WSACreateEvent" wsa-create-event
:cconv
:stdcall
) :int
)
561 (defcfun ("WSACloseEvent" wsa-close-event
:cconv
:stdcall
) bool
564 ;; this one is probably completely broken
565 (defun %wait-for-single-object
(handle timeout
)
566 (let ((ret (with-foreign-object (phandle :int
)
567 (setf (mem-ref phandle
:int
) handle
)
568 (%wait
1 phandle t
(timeout->milisec timeout
)))))
569 (when (or (eql ret
+wait-failed
+)
570 (eql ret
+wait-abandoned
+))
572 (let ((ready (= ret
+wait-object-0
+)))
574 (values ready ready
))))
576 ;; wasn't handling :read-write properly so won't pretend to support it
577 (defun %wait-until-fd-ready
(fd event-type timeout
)
578 (let ((handle (get-osfhandle fd
))
579 (ev (wsa-create-event)))
581 (let ((ret (wsa-event-select handle ev
(ecase event-type
583 (:write
+fd-write
+)))))
584 (if (eql ret
+socket-error
+)
585 (if (= (wsa-get-last-error) +wsaenotsock
+)
586 (wait-for-multiple-objects handle timeout
)
587 (error 'poll-error
:fd fd
))
588 (let ((ret (%wait-for-single-object ev timeout
)))
590 (:read
(values ret nil
))
591 (:write
(values nil ret
))))))
592 (wsa-close-event ev
)))))
595 (defun %wait-until-fd-ready
(fd event-type timeout
)
596 (flet ((choose-poll-flags (type)
598 (:read
(logior nix
::pollin nix
::pollrdhup nix
::pollpri
))
599 (:write
(logior nix
::pollout nix
::pollhup
))
600 (:read-write
(logior nix
::pollin nix
::pollrdhup nix
::pollpri
601 nix
::pollout nix
::pollhup
))))
602 (poll-error (unix-err)
603 (error 'poll-error
:fd fd
604 :identifier
(osicat-sys:system-error-identifier unix-err
))))
605 (let ((readp nil
) (writep nil
))
606 (with-foreign-object (pollfd 'nix
::pollfd
)
607 (nix:bzero pollfd nix
::size-of-pollfd
)
608 (with-foreign-slots ((nix::fd nix
::events nix
::revents
)
611 nix
::events
(choose-poll-flags event-type
))
613 (let ((ret (nix:repeat-upon-condition-decreasing-timeout
614 ((nix:eintr
) tmp-timeout timeout
)
615 (nix:poll pollfd
1 (timeout->milisec timeout
)))))
617 (return-from %wait-until-fd-ready
(values nil nil
))))
618 (nix:posix-error
(err) (poll-error err
)))
619 (flags-case nix
::revents
620 ((nix::pollin nix
::pollrdhup nix
::pollpri
)
622 ((nix::pollout nix
::pollhup
) (setf writep t
))
623 ((nix::pollerr nix
::pollnval
) (error 'poll-error
:fd fd
)))
624 (values readp writep
))))))
626 (defun wait-until-fd-ready (fd event-type
&optional timeout
)
627 "Poll file descriptor `FD' for I/O readiness. `EVENT-TYPE' must be
628 :READ, :WRITE or :READ-WRITE which means \"either :READ or :WRITE\".
629 `TIMEOUT' must be either a non-negative integer measured in seconds,
630 or `NIL' meaning no timeout at all."
631 (%wait-until-fd-ready fd event-type timeout
))
633 (defun fd-ready-p (fd &optional
(event-type :read
))
634 "Tests file-descriptor `FD' for I/O readiness. `EVENT-TYPE'
635 must be :READ, :WRITE or :READ-WRITE which means \"either :READ
637 (multiple-value-bind (readp writep
)
638 (wait-until-fd-ready fd event-type
0)
642 (:read-write
(or readp writep
)))))
644 (defun fd-readablep (fd)
645 (nth-value 0 (wait-until-fd-ready fd
:read
0)))
647 (defun fd-writablep (fd)
648 (nth-value 1 (wait-until-fd-ready fd
:write
0)))