Minor fixes in the IO.STREAMS test suite.
[iolib.git] / io-multiplex / common.lisp
blob508bf76f9e372d20e970fdcfdd2b933f78a40c90
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Indent-tabs-mode: NIL -*-
2 ;;;
3 ;;; common.lisp --- Miscellaneous definitions.
4 ;;;
5 ;;; Copyright (C) 2006-2007, Stelian Ionescu <sionescu@common-lisp.net>
6 ;;;
7 ;;; This code is free software; you can redistribute it and/or
8 ;;; modify it under the terms of the version 2.1 of
9 ;;; the GNU Lesser General Public License as published by
10 ;;; the Free Software Foundation, as clarified by the
11 ;;; preamble found here:
12 ;;; http://opensource.franz.com/preamble.html
13 ;;;
14 ;;; This program is distributed in the hope that it will be useful,
15 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
16 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 ;;; GNU General Public License for more details.
18 ;;;
19 ;;; You should have received a copy of the GNU Lesser General
20 ;;; Public License along with this library; if not, write to the
21 ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 ;;; Boston, MA 02110-1301, USA
24 (in-package :io.multiplex)
26 (eval-when (:compile-toplevel :load-toplevel :execute)
27 (defvar *available-multiplexers* nil)
28 (defvar *best-available-multiplexer* nil))
30 (defvar *maximum-event-loop-timeout* 1)
32 ;;;; EVENT-BASE
34 (defclass event-base ()
35 ((mux :initform (make-instance *best-available-multiplexer*)
36 :reader mux-of)
37 (fds :initform (make-hash-table :test 'eql)
38 :reader fds-of)
39 (timeouts :initform (make-queue)
40 :reader timeouts-of)
41 (exit :initform nil
42 :accessor exit-p)
43 (exit-when-empty :initarg :exit-when-empty
44 :accessor exit-when-empty-p))
45 (:default-initargs :exit-when-empty nil))
47 (defmethod print-object ((base event-base) stream)
48 (print-unreadable-object (base stream :type nil :identity t)
49 (format stream "event base, ~A FDs monitored, using: ~A"
50 (hash-table-count (fds-of base))
51 (mux-of base))))
53 (defmethod close ((event-base event-base) &key abort)
54 (declare (ignore abort))
55 (with-accessors ((mux mux-of)) event-base
56 (close mux)
57 (dolist (slot '(fds timeouts exit))
58 (setf (slot-value event-base slot) nil))
59 event-base))
61 (defgeneric add-fd (base fd event-type function &key timeout persistent))
63 (defgeneric add-timeout (event-base function timeout &key persistent))
65 (defgeneric remove-event (event-base event))
67 (defgeneric remove-events (event-base event-list))
69 (defgeneric event-dispatch (event-base &key &allow-other-keys))
71 (defgeneric exit-event-loop (event-base &key delay)
72 (:method ((event-base event-base) &key (delay 0))
73 (add-timeout event-base
74 #'(lambda (fd event-type)
75 (declare (ignore fd event-type))
76 (setf (exit-p event-base) t))
77 delay :persistent nil)))
79 (defgeneric event-base-empty-p (event-base)
80 (:method ((event-base event-base))
81 (and (zerop (hash-table-count (fds-of event-base)))
82 (queue-empty-p (timeouts-of event-base)))))
84 (defgeneric fd-entry-of (event-base fd)
85 (:method ((event-base event-base) fd)
86 (gethash fd (fds-of event-base))))
88 (defun %add-event (event-base event &optional fd-entry)
89 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
90 (when (event-timeout event)
91 ;; add the event to the timeout queue
92 (queue-sorted-insert timeouts event #'< #'event-abs-timeout))
93 (let ((fd (event-fd event)))
94 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
95 ;; if there's no such fd-entry, create it
96 (when fd
97 (fd-entry-add-event fd-entry event)
98 (setf (gethash fd fds) fd-entry))
99 (values event))))
101 (defun %remove-event (event-base event)
102 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
103 (when (event-timeout event)
104 ;; remove the event from the timeout queue
105 (queue-delete timeouts event))
106 (let ((fd (event-fd event)))
107 ;; if it's an FD event remove it from its fd-entry
108 ;; if the fd-emtry is then empty, remove it
109 (when fd
110 (let ((fd-entry (gethash fd fds)))
111 (assert fd-entry)
112 (fd-entry-del-event fd-entry event)
113 (when (fd-entry-empty-p fd-entry)
114 (remhash fd fds))))
115 (values event))))
117 (defun calc-possible-edge-change-when-adding (fd-entry event-type)
118 (cond ((and (eql event-type :read)
119 (queue-empty-p (fd-entry-read-events fd-entry)))
120 :read-add)
121 ((and (eql event-type :write)
122 (queue-empty-p (fd-entry-write-events fd-entry)))
123 :write-add)))
125 (defmethod add-fd ((event-base event-base) fd event-type function
126 &key timeout persistent)
127 (check-type fd unsigned-byte)
128 (check-type event-type fd-event)
129 (let ((fd-limit (fd-limit-of (mux-of event-base))))
130 (when (and fd-limit (> fd fd-limit))
131 (error "Cannot add such a large FD: ~A" fd)))
132 (let ((current-entry (fd-entry-of event-base fd))
133 (event (make-event fd event-type function persistent
134 (abs-timeout timeout)
135 (normalize-timeout timeout)))
136 (edge-change nil))
137 (if current-entry
138 (progn
139 (setf edge-change (calc-possible-edge-change-when-adding
140 current-entry event-type))
141 (%add-event event-base event current-entry)
142 (when edge-change
143 (setf (fd-entry-edge-change current-entry) edge-change)
144 (update-fd (mux-of event-base) current-entry)
145 (setf (fd-entry-edge-change current-entry) nil)))
146 (progn
147 (setf current-entry (make-fd-entry fd))
148 (%add-event event-base event current-entry)
149 (unless (monitor-fd (mux-of event-base) current-entry)
150 (%remove-event event-base event))))
151 (values event)))
153 (defmethod add-timeout ((event-base event-base) function timeout
154 &key persistent)
155 (assert timeout)
156 (%add-event event-base (make-event nil :timeout function persistent
157 (abs-timeout timeout)
158 (normalize-timeout timeout))))
160 (defun calc-possible-edge-change-when-removing (fd-entry event-type)
161 (cond ((and (eql event-type :read)
162 (not (queue-empty-p (fd-entry-read-events fd-entry))))
163 :read-del)
164 ((and (eql event-type :write)
165 (not (queue-empty-p (fd-entry-write-events fd-entry))))
166 :write-del)))
168 (defmethod remove-event ((event-base event-base) event)
169 (check-type (event-type event) event-type)
170 (let* ((fd (event-fd event))
171 (current-entry (fd-entry-of event-base fd))
172 (edge-change nil))
173 (if current-entry
174 (progn
175 (setf edge-change (calc-possible-edge-change-when-removing
176 current-entry (event-type event)))
177 (%remove-event event-base event)
178 (if (fd-entry-empty-p current-entry)
179 (unmonitor-fd (mux-of event-base) current-entry)
180 (when edge-change
181 (setf (fd-entry-edge-change current-entry) edge-change)
182 (update-fd (mux-of event-base) current-entry)
183 (setf (fd-entry-edge-change current-entry) nil))))
184 (%remove-event event-base event)))
185 (values event-base))
187 (defmacro with-fd-handler ((event-base fd event-type function &optional timeout)
188 &body body)
189 (once-only (event-base)
190 (with-unique-names (event)
191 `(let (,event)
192 (unwind-protect
193 (progn
194 (setf ,event (add-fd ,event-base ,fd ,event-type ,function
195 :persistent t
196 :timeout ,timeout))
197 ,@body)
198 (when ,event
199 (remove-event ,event-base ,event)))))))
201 (defmethod event-dispatch :around ((event-base event-base)
202 &key timeout only-once)
203 (setf (exit-p event-base) nil)
204 (when timeout (exit-event-loop event-base :delay timeout))
205 (call-next-method event-base :only-once only-once))
207 (defun recalculate-timeouts (timeouts)
208 (let ((now (get-monotonic-time)))
209 (dolist (ev (queue-head timeouts))
210 (event-recalc-abs-timeout ev now))))
212 (defun dispatch-timeouts (dispatch-list)
213 (dolist (ev dispatch-list)
214 (funcall (event-handler ev) nil :timeout)))
216 (defmethod remove-events ((event-base event-base) event-list)
217 (dolist (ev event-list)
218 (remove-event event-base ev)))
220 (defmethod event-dispatch ((event-base event-base) &key only-once)
221 (with-accessors ((mux mux-of) (fds fds-of)
222 (exit-p exit-p) (exit-when-empty exit-when-empty-p)
223 (timeouts timeouts-of)) event-base
224 (flet ((recalc-poll-timeout ()
225 (calc-min-timeout (events-calc-min-rel-timeout timeouts)
226 *maximum-event-loop-timeout*)))
227 (do ((poll-timeout (recalc-poll-timeout) (recalc-poll-timeout))
228 (deletion-list () ())
229 (dispatch-list () ()))
230 ((or exit-p (and exit-when-empty (event-base-empty-p event-base))))
231 (recalculate-timeouts timeouts)
232 (when (dispatch-fd-events-once event-base poll-timeout)
233 (and only-once (setf exit-p t)))
234 (setf (values deletion-list dispatch-list)
235 (filter-expired-events (expired-events timeouts
236 (get-monotonic-time))))
237 (dispatch-timeouts dispatch-list)
238 (remove-events event-base deletion-list)
239 (queue-sort timeouts #'< #'event-abs-timeout)))))
241 (defun dispatch-fd-events-once (event-base timeout)
242 "Waits for events and dispatches them. Returns T if some events
243 have been received, NIL otherwise."
244 (with-accessors ((mux mux-of) (fds fds-of) (timeouts timeouts-of))
245 event-base
246 (let ((deletion-list ())
247 (fd-events (harvest-events mux timeout)))
248 (dolist (ev fd-events)
249 (destructuring-bind (fd ev-types) ev
250 (let ((fd-entry (fd-entry-of event-base fd)))
251 (if fd-entry
252 (let ((errorp (member :error ev-types)))
253 (when errorp
254 (dispatch-error-events fd-entry)
255 (nconcf deletion-list
256 (fd-entry-all-events fd-entry)))
257 (when (member :read ev-types)
258 (dispatch-read-events fd-entry)
259 (or errorp
260 (nconcf deletion-list
261 (fd-entry-one-shot-events fd-entry :read))))
262 (when (member :write ev-types)
263 (dispatch-write-events fd-entry)
264 (or errorp
265 (nconcf deletion-list
266 (fd-entry-one-shot-events fd-entry :write)))))
267 (warn "Got spurious event for non-monitored FD: ~A" fd)))))
268 (dolist (ev deletion-list)
269 (remove-event event-base ev))
270 (consp fd-events))))
272 (defun expired-events (queue now)
273 (queue-filter queue
274 #'(lambda (to) (and to (<= to now)))
275 #'event-abs-timeout))
277 (defun filter-expired-events (events)
278 (let ((deletion-list ())
279 (dispatch-list ()))
280 (dolist (ev events)
281 (push ev dispatch-list)
282 (unless (event-persistent-p ev)
283 (push ev deletion-list)))
284 (values deletion-list dispatch-list)))
286 (defun events-calc-min-rel-timeout (timeouts)
287 (let* ((now (get-monotonic-time))
288 (first-valid-event (find-if #'(lambda (to)
289 (or (null to) (< now to)))
290 (queue-head timeouts)
291 :key #'event-abs-timeout)))
292 (when (and first-valid-event
293 (event-abs-timeout first-valid-event))
294 (- (event-abs-timeout first-valid-event) now))))
296 (defun dispatch-error-events (fd-entry)
297 (dolist (ev (queue-head (fd-entry-error-events fd-entry)))
298 (funcall (event-handler ev) (fd-entry-fd fd-entry) :error)))
300 (defun dispatch-read-events (fd-entry)
301 (dolist (ev (queue-head (fd-entry-read-events fd-entry)))
302 (funcall (event-handler ev) (fd-entry-fd fd-entry) :read)))
304 (defun dispatch-write-events (fd-entry)
305 (dolist (ev (queue-head (fd-entry-write-events fd-entry)))
306 (funcall (event-handler ev) (fd-entry-fd fd-entry) :write)))
308 ;;;; FD-ENTRY
310 (deftype fd-event ()
311 '(member :read :write :error))
313 (deftype event-type ()
314 '(or fd-event (member :timeout)))
316 (defstruct (fd-entry (:constructor make-fd-entry (fd))
317 (:copier nil))
318 (fd 0 :type unsigned-byte)
319 (edge-change nil :type symbol)
320 (read-events (make-queue) :type queue)
321 (write-events (make-queue) :type queue)
322 (error-events (make-queue) :type queue))
324 (defun fd-entry-event-list (fd-entry event-type)
325 (check-type fd-entry fd-entry)
326 (check-type event-type fd-event)
327 (case event-type
328 (:read (fd-entry-read-events fd-entry))
329 (:write (fd-entry-write-events fd-entry))
330 (:error (fd-entry-error-events fd-entry))))
332 (defun (setf fd-entry-event-list) (fd-entry event-list event-type)
333 (check-type fd-entry fd-entry)
334 (check-type event-type fd-event)
335 (case event-type
336 (:read (setf (fd-entry-read-events fd-entry) event-list))
337 (:write (setf (fd-entry-write-events fd-entry) event-list))
338 (:error (setf (fd-entry-error-events fd-entry) event-list))))
340 (defun fd-entry-empty-p (fd-entry)
341 (and (queue-empty-p (fd-entry-read-events fd-entry))
342 (queue-empty-p (fd-entry-write-events fd-entry))
343 (queue-empty-p (fd-entry-error-events fd-entry))))
345 (defun fd-entry-add-event (fd-entry event)
346 (queue-enqueue (fd-entry-event-list fd-entry (event-type event))
347 event))
349 (defun fd-entry-del-event (fd-entry event)
350 (queue-delete (fd-entry-event-list fd-entry (event-type event))
351 event))
353 (defun fd-entry-all-events (fd-entry)
354 (append (queue-head (fd-entry-read-events fd-entry))
355 (queue-head (fd-entry-write-events fd-entry))
356 (queue-head (fd-entry-error-events fd-entry))))
358 (defun fd-entry-one-shot-events (fd-entry event-type)
359 (remove-if #'event-persistent-p
360 (queue-head (fd-entry-event-list fd-entry event-type))))
362 ;;;; Event
364 (defstruct (event (:constructor make-event (fd type handler persistent-p
365 abs-timeout timeout))
366 (:copier nil))
367 ;; a file descriptor or nil in case of a timeout
368 (fd nil :type (or null unsigned-byte))
369 (type nil :type (or null event-type))
370 (handler nil :type (or null function))
371 ;; if an event is not persistent it is removed
372 ;; after it occurs or if it times out
373 (persistent-p nil :type boolean)
374 (abs-timeout nil :type (or null timeout))
375 (timeout nil :type (or null timeout)))
377 (defun event-timed-out-p (event timeout)
378 (let ((ev-to (event-abs-timeout event)))
379 (when (and ev-to timeout)
380 (< timeout ev-to))))
382 (defun event-recalc-abs-timeout (event now)
383 (setf (event-abs-timeout event)
384 (+ now (event-timeout event))))
386 ;;;; Multiplexer
388 #+windows
389 (defcfun ("_getmaxstdio" get-fd-limit) :int)
391 #-windows
392 (defun get-fd-limit ()
393 (let ((fd-limit (nix:getrlimit nix::rlimit-nofile)))
394 (unless (eql fd-limit nix::rlim-infinity)
395 (1- fd-limit))))
397 (defclass multiplexer ()
398 ((fd :reader fd-of)
399 (fd-limit :initform (get-fd-limit)
400 :initarg :fd-limit
401 :reader fd-limit-of)))
403 (defgeneric monitor-fd (mux fd-entry)
404 (:method ((mux multiplexer) fd-entry)
405 (declare (ignore fd-entry))
408 (defgeneric update-fd (mux fd-entry)
409 (:method ((mux multiplexer) fd-entry)
410 (declare (ignore fd-entry))
413 (defgeneric unmonitor-fd (mux fd-entry)
414 (:method ((mux multiplexer) fd-entry)
415 (declare (ignore fd-entry))
418 ;;; Returns a list of fd/result pairs which have one of these forms:
419 ;;; (fd (:read))
420 ;;; (fd (:write))
421 ;;; (fd (:read :write))
422 ;;; (fd . :error)
423 (defgeneric harvest-events (mux timeout))
425 (defgeneric close-multiplexer (mux)
426 (:method-combination progn :most-specific-last)
427 (:method progn ((mux multiplexer))
428 (when (slot-value mux 'fd)
429 (nix:close (fd-of mux))
430 (setf (slot-value mux 'fd) nil))
431 mux))
433 (defmethod monitor-fd :around ((mux multiplexer) fd-entry)
434 (if (ignore-and-print-errors (call-next-method))
436 (warn "FD monitoring failed for FD ~A."
437 (fd-entry-fd fd-entry))))
439 (defmethod update-fd :around ((mux multiplexer) fd-entry)
440 (if (ignore-and-print-errors (call-next-method))
442 (warn "FD status update failed for FD ~A."
443 (fd-entry-fd fd-entry))))
445 (defmethod unmonitor-fd :around ((mux multiplexer) fd-entry)
446 (if (ignore-and-print-errors (call-next-method))
448 (warn "FD unmonitoring failed for FD ~A."
449 (fd-entry-fd fd-entry))))
451 (defmacro define-multiplexer (name priority superclasses slots &rest options)
452 `(progn
453 (defclass ,name ,superclasses ,slots ,@options)
454 (pushnew (cons ,priority ',name)
455 *available-multiplexers*)))
457 ;;;; Misc
459 ;;; FIXME: Until a way to autodetect platform features is implemented
460 #+darwin
461 (defconstant nix::pollrdhup 0)
463 (define-condition poll-error (error)
464 ((fd :initarg :fd :reader poll-error-fd)
465 (identifier :initarg :identifier :initform "<Unknown error>"
466 :reader poll-error-identifier))
467 (:report (lambda (condition stream)
468 (format stream "Error caught while polling file descriptor ~A: ~A"
469 (poll-error-fd condition)
470 (poll-error-identifier condition))))
471 (:documentation
472 "Signaled when an error occurs while polling for I/O readiness
473 of a file descriptor."))
475 ;;; This should probably be moved elsewhere. Also, it's quite a mess.
476 #+windows
477 (progn
478 (load-foreign-library "User32.dll")
479 (load-foreign-library "msvcrt.dll")
480 (load-foreign-library "Ws2_32.dll")
482 (defctype dword :unsigned-long)
483 (defctype bool (:boolean :int))
485 (cl-posix-ffi:defsyscall "get_osfhandle" :long
486 (fd :int))
488 (defconstant +wait-failed+ #xffffffff)
489 (defconstant +wait-abandoned+ #x80)
490 (defconstant +wait-object-0+ 0)
491 (defconstant +wait-timeout+ #x102)
492 (defconstant +true+ 1)
493 (defconstant +fd-read+ 1)
494 (defconstant +fd-write+ 2)
495 (defconstant +socket-error+ -1)
496 (defconstant +wsaenotsock+ 10038)
498 (defcfun ("WaitForMultipleObjects" %wait :cconv :stdcall) dword
499 (count dword)
500 (handles :pointer)
501 (wait-all bool)
502 (millis dword))
504 (defcfun ("WSAGetLastError" wsa-get-last-error :cconv :stdcall) :int)
506 (defcfun ("WSAEventSelect" wsa-event-select :cconv :stdcall) :int
507 (socket-handle :int)
508 (event-handle :int)
509 (event-mask :long))
511 (defcfun ("WSACreateEvent" wsa-create-event :cconv :stdcall) :int)
513 (defcfun ("WSACloseEvent" wsa-close-event :cconv :stdcall) bool
514 (event :int))
516 ;; this one is probably completely broken
517 (defun wait-for-multiple-objects (handle timeout)
518 (let ((ret (with-foreign-object (phandle :int)
519 (setf (mem-ref phandle :int) handle)
520 (%wait 1 phandle t (timeout->milisec timeout)))))
521 (when (or (eql ret +wait-failed+)
522 (eql ret +wait-abandoned+))
523 (error 'poll-error))
524 (let ((ready (= ret +wait-object-0+)))
525 ;; is this right?
526 (values ready ready))))
528 ;; wasn't handling :read-write properly so won't pretend to support it
529 (defun %wait-until-fd-ready (fd event-type timeout)
530 (let ((handle (get-osfhandle fd))
531 (ev (wsa-create-event)))
532 (unwind-protect
533 (let ((ret (wsa-event-select handle ev (ecase event-type
534 (:read +fd-read+)
535 (:write +fd-write+)))))
536 (if (eql ret +socket-error+)
537 (if (= (wsa-get-last-error) +wsaenotsock+)
538 (wait-for-multiple-objects handle timeout)
539 (error 'poll-error :fd fd))
540 (let ((ret (wait-for-multiple-objects ev timeout)))
541 (ecase event-type
542 (:read (values ret nil))
543 (:write (values nil ret))))))
544 (wsa-close-event ev)))))
546 #-windows
547 (defun %wait-until-fd-ready (fd event-type timeout)
548 (flet ((choose-poll-flags (type)
549 (ecase type
550 (:read (logior nix::pollin nix::pollrdhup nix::pollpri))
551 (:write (logior nix::pollout nix::pollhup))
552 (:read-write (logior nix::pollin nix::pollrdhup nix::pollpri
553 nix::pollout nix::pollhup))))
554 (poll-error (unix-err)
555 (error 'poll-error :fd fd
556 :identifier (nix:system-error-identifier unix-err))))
557 (let ((readp nil) (writep nil))
558 (with-foreign-object (pollfd 'nix::pollfd)
559 (nix:bzero pollfd nix::size-of-pollfd)
560 (with-foreign-slots ((nix::fd nix::events nix::revents)
561 pollfd nix::pollfd)
562 (setf nix::fd fd
563 nix::events (choose-poll-flags event-type))
564 (handler-case
565 (let ((ret (nix:repeat-upon-condition-decreasing-timeout
566 ((nix:eintr) tmp-timeout timeout)
567 (nix:poll pollfd 1 (timeout->milisec timeout)))))
568 (when (zerop ret)
569 (return-from %wait-until-fd-ready (values nil nil))))
570 (nix:posix-error (err) (poll-error err)))
571 (flags-case nix::revents
572 ((nix::pollin nix::pollrdhup nix::pollpri)
573 (setf readp t))
574 ((nix::pollout nix::pollhup) (setf writep t))
575 ((nix::pollerr nix::pollnval) (error 'poll-error :fd fd)))
576 (values readp writep))))))
578 (defun wait-until-fd-ready (fd event-type &optional timeout)
579 "Poll file descriptor `FD' for I/O readiness. `EVENT-TYPE' must be
580 :READ, :WRITE or :READ-WRITE which means \"either :READ or :WRITE\".
581 `TIMEOUT' must be either a non-negative integer measured in seconds,
582 or `NIL' meaning no timeout at all."
583 (%wait-until-fd-ready fd event-type timeout))
585 (defun fd-ready-p (fd &optional (event-type :read))
586 "Tests file-descriptor `FD' for I/O readiness. `EVENT-TYPE'
587 must be :READ, :WRITE or :READ-WRITE which means \"either :READ
588 or :WRITE\"."
589 (multiple-value-bind (readp writep)
590 (wait-until-fd-ready fd event-type 0)
591 (ecase event-type
592 (:read readp)
593 (:write writep)
594 (:read-write (or readp writep)))))
596 (defun fd-readablep (fd)
597 (nth-value 0 (wait-until-fd-ready fd :read 0)))
599 (defun fd-writablep (fd)
600 (nth-value 1 (wait-until-fd-ready fd :write 0)))