Now using IOLIB-UTILS in all packages.
[iolib.git] / io-multiplex / common.lisp
blob6c2022024627e7514ebedd7b1ebe7566feb1ecca
1 ;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp -*-
3 ;; Copyright (C) 2006, 2007 Stelian Ionescu
4 ;;
5 ;; This code is free software; you can redistribute it and/or
6 ;; modify it under the terms of the version 2.1 of
7 ;; the GNU Lesser General Public License as published by
8 ;; the Free Software Foundation, as clarified by the
9 ;; preamble found here:
10 ;; http://opensource.franz.com/preamble.html
12 ;; This program is distributed in the hope that it will be useful,
13 ;; but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ;; GNU General Public License for more details.
17 ;; You should have received a copy of the GNU Lesser General
18 ;; Public License along with this library; if not, write to the
19 ;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 ;; Boston, MA 02110-1301, USA
22 (in-package :io.multiplex)
24 (eval-when (:compile-toplevel :load-toplevel :execute)
25 (defvar *available-multiplexers* nil)
26 (defvar *best-available-multiplexer* nil))
28 ;;;
29 ;;; Event-Base
30 ;;;
31 (defclass event-base ()
32 ((mux :initform (make-instance *best-available-multiplexer*)
33 :reader mux-of)
34 (fds :initform (make-hash-table :test 'eql)
35 :reader fds-of)
36 (timeouts :initform (make-queue)
37 :reader timeouts-of)
38 (main-timeout :initform nil
39 :accessor main-timeout-of)
40 (exit :initform nil
41 :accessor exit-p)))
44 (defmethod print-object ((base event-base) stream)
45 (print-unreadable-object (base stream :type nil :identity t)
46 (format stream "event base, ~A FDs monitored, using: ~A"
47 (hash-table-count (fds-of base))
48 (mux-of base))))
51 (defgeneric close-event-base (event-base)
52 (:method ((event-base event-base))
53 (with-accessors ((mux mux-of)) event-base
54 (close-multiplexer mux)
55 (mapc #'(lambda (slot)
56 (setf (slot-value event-base slot) nil))
57 '(fds timeouts exit))
58 event-base)))
61 (defgeneric add-fd (base fd event-type function &key timeout persistent))
64 (defgeneric add-timeout (event-base function timeout &key persistent))
67 (defgeneric remove-event (event-base event))
70 (defgeneric remove-events (event-base event-list))
73 (defgeneric event-dispatch (event-base &key timeout only-once))
76 (defgeneric exit-event-loop (event-base &key delay)
77 (:method ((event-base event-base) &key delay)
78 (setf (main-timeout-of event-base)
79 (add-timeout event-base
80 #'(lambda (fd event-type)
81 (declare (ignore fd event-type))
82 (setf (exit-p event-base) t))
83 delay :persistent nil))))
86 (defgeneric event-base-empty-p (event-base)
87 (:method ((event-base event-base))
88 (and (zerop (hash-table-count (fds-of event-base)))
89 (queue-empty-p (timeouts-of event-base)))))
92 (defgeneric fd-entry-of (event-base fd)
93 (:method ((event-base event-base) fd)
94 (gethash fd (fds-of event-base))))
97 (defun %add-event (event-base event &optional fd-entry)
98 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
99 (when (event-timeout event)
100 ;; add the event to the timeout queue
101 (queue-sorted-insert timeouts event #'< #'event-abs-timeout))
102 (let ((fd (event-fd event)))
103 ;; if it's an FD event add it to its fd-entry int the FDs hash-table
104 ;; if there's no such fd-entry, create it
105 (when fd
106 (fd-entry-add-event fd-entry event)
107 (setf (gethash fd fds) fd-entry))
108 (values event))))
111 (defun %remove-event (event-base event)
112 (with-accessors ((fds fds-of) (timeouts timeouts-of)) event-base
113 (when (event-timeout event)
114 ;; remove the event from the timeout queue
115 (queue-delete timeouts event))
116 (let ((fd (event-fd event)))
117 ;; if it's an FD event remove it from its fd-entry
118 ;; if the fd-emtry is then empty, remove it
119 (when fd
120 (let ((fd-entry (gethash fd fds)))
121 (assert fd-entry)
122 (fd-entry-del-event fd-entry event)
123 (when (fd-entry-empty-p fd-entry)
124 (remhash fd fds))))
125 (values event))))
128 (defun calc-possible-edge-change-when-adding (fd-entry event-type)
129 (cond ((and (eql event-type :read)
130 (queue-empty-p (fd-entry-read-events fd-entry)))
131 :read-add)
132 ((and (eql event-type :write)
133 (queue-empty-p (fd-entry-write-events fd-entry)))
134 :write-add)))
137 (defmethod add-fd ((event-base event-base) fd event-type function &key timeout persistent)
138 (check-type event-type fd-event)
140 (let ((fd-limit (fd-limit-of (mux-of event-base))))
141 (when (and fd-limit (> fd fd-limit))
142 (error "Cannot add such a large FD: ~A" fd)))
143 (let ((current-entry (fd-entry-of event-base fd))
144 (event (make-event fd event-type function persistent
145 (abs-timeout timeout)
146 (normalize-timeout timeout)))
147 (edge-change nil))
148 (if current-entry
149 (progn
150 (setf edge-change (calc-possible-edge-change-when-adding
151 current-entry event-type))
152 (%add-event event-base event current-entry)
153 (when edge-change
154 (setf (fd-entry-edge-change current-entry) edge-change)
155 (update-fd (mux-of event-base) current-entry)
156 (setf (fd-entry-edge-change current-entry) nil)))
157 (progn
158 (setf current-entry (make-fd-entry fd))
159 (%add-event event-base event current-entry)
160 (unless (monitor-fd (mux-of event-base) current-entry)
161 (%remove-event event-base event))))
162 (values event)))
165 (defmethod add-timeout ((event-base event-base) function timeout &key persistent)
166 (assert timeout)
167 (%add-event event-base (make-event nil :timeout function persistent
168 (abs-timeout timeout)
169 (normalize-timeout timeout))))
172 (defun calc-possible-edge-change-when-removing (fd-entry event-type)
173 (cond ((and (eql event-type :read)
174 (not (queue-empty-p (fd-entry-read-events fd-entry))))
175 :read-del)
176 ((and (eql event-type :write)
177 (not (queue-empty-p (fd-entry-write-events fd-entry))))
178 :write-del)))
181 (defmethod remove-event ((event-base event-base) event)
182 (check-type (event-type event) event-type)
184 (let* ((fd (event-fd event))
185 (current-entry (fd-entry-of event-base fd))
186 (edge-change nil))
187 (if current-entry
188 (progn
189 (setf edge-change (calc-possible-edge-change-when-removing
190 current-entry (event-type event)))
191 (%remove-event event-base event)
192 (if (fd-entry-empty-p current-entry)
193 (unmonitor-fd (mux-of event-base) current-entry)
194 (when edge-change
195 (setf (fd-entry-edge-change current-entry) edge-change)
196 (update-fd (mux-of event-base) current-entry)
197 (setf (fd-entry-edge-change current-entry) nil))))
198 (%remove-event event-base event)))
199 (values event-base))
202 (defmethod remove-events ((event-base event-base) event-list)
203 (dolist (ev event-list)
204 (remove-event event-base ev)))
207 (defmacro with-fd-handler ((event-base fd event-type function
208 &optional timeout)
209 &body body)
210 (let ((event (gensym "EVENT-")))
211 `(let (,event)
212 (unwind-protect
213 (progn
214 (setf ,event (add-fd ,event-base ,fd ,event-type ,function
215 :persistent t
216 :timeout ,timeout))
217 ,@body)
218 (when ,event
219 (remove-event ,event-base ,event))))))
222 (defmethod event-dispatch :around ((event-base event-base) &key timeout only-once)
223 (setf (exit-p event-base) nil)
224 (setf (main-timeout-of event-base) nil)
225 (when timeout
226 (exit-event-loop event-base :delay timeout)
227 (setf only-once t))
228 (unless (event-base-empty-p event-base)
229 (call-next-method event-base :timeout timeout
230 :only-once only-once)))
233 (defmethod event-dispatch ((event-base event-base) &key timeout only-once)
234 (with-accessors ((mux mux-of) (exit-p exit-p)
235 (fds fds-of) (timeouts timeouts-of)) event-base
236 (let* ((min-event-timeout (events-calc-min-rel-timeout timeouts))
237 (actual-timeout (calc-min-timeout min-event-timeout timeout))
238 (before nil)
239 (after nil))
240 (loop
241 :with deletion-list := ()
242 :with dispatch-list := ()
245 (setf before (et::gettime))
246 (mapc #'(lambda (ev)
247 (event-recalc-abs-timeout ev before))
248 (queue-head timeouts))
249 (dispatch-fd-events-once event-base actual-timeout)
250 (setf after (et::gettime))
252 (let ((main-timeout (main-timeout-of event-base)))
253 (when main-timeout
254 (remove-event event-base main-timeout)
255 (setf (main-timeout-of event-base) nil)))
257 (setf (values deletion-list dispatch-list)
258 (filter-expired-events (expired-events timeouts after)))
259 (dispatch-timeouts dispatch-list)
260 (remove-events event-base deletion-list)
262 (queue-sort timeouts #'< #'event-abs-timeout)
264 :when (or only-once
265 exit-p
266 (event-base-empty-p event-base))
267 :do (loop-finish)))))
270 (defun dispatch-fd-events-once (event-base timeout)
271 (with-accessors ((mux mux-of) (fds fds-of)
272 (timeouts timeouts-of)) event-base
273 (let ((deletion-list ())
274 (fd-events (harvest-events mux timeout)))
275 (dolist (ev fd-events)
276 (destructuring-bind (fd ev-types) ev
277 (let ((fd-entry (fd-entry-of event-base fd)))
278 (if fd-entry
279 (progn
280 (when (member :error ev-types)
281 (dispatch-error-events fd-entry)
282 (setf deletion-list (fd-entry-all-events fd-entry)))
283 (when (member :read ev-types)
284 (dispatch-read-events fd-entry)
285 (setf deletion-list
286 (nconc deletion-list
287 (fd-entry-one-shot-events fd-entry :read))))
288 (when (member :write ev-types)
289 (dispatch-write-events fd-entry)
290 (setf deletion-list
291 (nconc deletion-list
292 (fd-entry-one-shot-events fd-entry :write)))))
293 (warn "Got spurious event for non-monitored FD: ~A" fd)))))
294 (dolist (ev deletion-list)
295 (remove-event event-base ev)))))
298 (defun expired-events (queue now)
299 (queue-filter queue
300 #'(lambda (to)
301 (and to (<= to now)))
302 #'event-abs-timeout))
305 (defun filter-expired-events (events)
306 (let ((deletion-list ())
307 (dispatch-list ()))
308 (dolist (ev events)
309 (push ev dispatch-list)
310 (unless (event-persistent-p ev)
311 (push ev deletion-list)))
312 (values deletion-list dispatch-list)))
315 (defun dispatch-timeouts (dispatch-list)
316 (dolist (ev dispatch-list)
317 (funcall (event-handler ev) nil :timeout)))
320 (defun events-calc-min-rel-timeout (timeouts)
321 (let* ((now (et::gettime))
322 (first-valid-event (find-if #'(lambda (to)
323 (or (null to) (< now to)))
324 (queue-head timeouts)
325 :key #'event-abs-timeout)))
326 (when (and first-valid-event
327 (event-abs-timeout first-valid-event))
328 (- (event-abs-timeout first-valid-event) now))))
331 (defun dispatch-error-events (fd-entry)
332 (dolist (ev (queue-head (fd-entry-error-events fd-entry)))
333 (funcall (event-handler ev) (fd-entry-fd fd-entry) :error)))
336 (defun dispatch-read-events (fd-entry)
337 (dolist (ev (queue-head (fd-entry-read-events fd-entry)))
338 (funcall (event-handler ev) (fd-entry-fd fd-entry) :read)))
341 (defun dispatch-write-events (fd-entry)
342 (dolist (ev (queue-head (fd-entry-write-events fd-entry)))
343 (funcall (event-handler ev) (fd-entry-fd fd-entry) :write)))
346 ;;; FD-Entry
348 (deftype fd-event ()
349 '(member :read :write :error))
352 (deftype event-type ()
353 '(or fd-event (member :timeout)))
356 (defstruct (fd-entry
357 (:constructor make-fd-entry (fd))
358 (:copier nil))
359 (fd 0 :type unsigned-byte)
360 (edge-change nil :type symbol)
361 (read-events (make-queue) :type queue)
362 (write-events (make-queue) :type queue)
363 (error-events (make-queue) :type queue))
366 (defun fd-entry-event-list (fd-entry event-type)
367 (check-type fd-entry fd-entry)
368 (check-type event-type fd-event)
369 (case event-type
370 (:read (fd-entry-read-events fd-entry))
371 (:write (fd-entry-write-events fd-entry))
372 (:error (fd-entry-error-events fd-entry))))
375 (defun (setf fd-entry-event-list) (fd-entry event-list event-type)
376 (check-type fd-entry fd-entry)
377 (check-type event-type fd-event)
378 (case event-type
379 (:read (setf (fd-entry-read-events fd-entry) event-list))
380 (:write (setf (fd-entry-write-events fd-entry) event-list))
381 (:error (setf (fd-entry-error-events fd-entry) event-list))))
384 (defun fd-entry-empty-p (fd-entry)
385 (and (queue-empty-p (fd-entry-read-events fd-entry))
386 (queue-empty-p (fd-entry-write-events fd-entry))
387 (queue-empty-p (fd-entry-error-events fd-entry))))
390 (defun fd-entry-add-event (fd-entry event)
391 (queue-enqueue (fd-entry-event-list fd-entry (event-type event))
392 event))
395 (defun fd-entry-del-event (fd-entry event)
396 (queue-delete (fd-entry-event-list fd-entry (event-type event))
397 event))
400 (defun fd-entry-all-events (fd-entry)
401 (append (queue-head (fd-entry-read-events fd-entry))
402 (queue-head (fd-entry-write-events fd-entry))
403 (queue-head (fd-entry-error-events fd-entry))))
406 (defun fd-entry-one-shot-events (fd-entry event-type)
407 (remove-if #'event-persistent-p
408 (queue-head (fd-entry-event-list fd-entry event-type))))
411 ;;; Event
413 (defstruct (event
414 (:constructor make-event (fd type handler persistent-p
415 abs-timeout timeout))
416 (:copier nil))
417 ;; a file descriptor or nil in case of a timeout
418 (fd nil :type (or null unsigned-byte))
419 (type nil :type (or null event-type))
420 (handler nil :type (or null function))
421 ;; if an event is not persistent it is removed
422 ;; after it occurs or if it times out
423 (persistent-p nil :type boolean)
424 (abs-timeout nil :type (or null timeout))
425 (timeout nil :type (or null timeout)))
428 (defun event-timed-out-p (event timeout)
429 (let ((ev-to (event-abs-timeout event)))
430 (when (and ev-to timeout)
431 (< timeout ev-to))))
434 (defun event-recalc-abs-timeout (event now)
435 (setf (event-abs-timeout event)
436 (+ now (event-timeout event))))
439 ;;; Multiplexer
442 (defun get-fd-limit ()
443 (let ((fd-limit (et:get-resource-limit et:rlimit-nofile)))
444 (unless (eql fd-limit et:rlim-infinity)
445 (1- fd-limit))))
448 (defclass multiplexer ()
449 ((fd :reader fd-of)
450 (fd-limit :initform (get-fd-limit)
451 :initarg :fd-limit
452 :reader fd-limit-of)))
455 (defgeneric monitor-fd (mux fd-entry)
456 (:method ((mux multiplexer) fd-entry)
457 (declare (ignore fd-entry))
461 (defgeneric update-fd (mux fd-entry)
462 (:method ((mux multiplexer) fd-entry)
463 (declare (ignore fd-entry))
467 (defgeneric unmonitor-fd (mux fd-entry)
468 (:method ((mux multiplexer) fd-entry)
469 (declare (ignore fd-entry))
473 (defgeneric harvest-events (mux timeout))
476 (defgeneric close-multiplexer (mux)
477 (:method-combination progn :most-specific-last)
478 (:method progn ((mux multiplexer))
479 (when (slot-value mux 'fd)
480 (et:close (fd-of mux))
481 (setf (slot-value mux 'fd) nil))
482 mux))
485 (defmethod monitor-fd :around ((mux multiplexer) fd-entry)
486 (if (ignore-and-print-errors (call-next-method))
488 (warn "FD monitoring failed for FD ~A."
489 (fd-entry-fd fd-entry))))
492 (defmethod update-fd :around ((mux multiplexer) fd-entry)
493 (if (ignore-and-print-errors (call-next-method))
495 (warn "FD status update failed for FD ~A."
496 (fd-entry-fd fd-entry))))
499 (defmethod unmonitor-fd :around ((mux multiplexer) fd-entry)
500 (if (ignore-and-print-errors (call-next-method))
502 (warn "FD unmonitoring failed for FD ~A."
503 (fd-entry-fd fd-entry))))
506 (defmacro define-multiplexer (name priority superclasses slots &rest options)
507 `(progn
508 (defclass ,name ,superclasses ,slots ,@options)
509 (pushnew (cons ,priority ',name)
510 *available-multiplexers*)))
512 ;;;;
513 ;;;; Misc
514 ;;;;
516 ;; FIXME: Until a way to autodetect platform features is implemented
517 (define-constant et::pollrdhup 0)
519 (defun wait-until-fd-ready (fd event-type &optional timeout)
520 ;; FIXME: this conses badly for its return value
521 ;; solution: (1) use a fixnum bitmap, just like C
522 ;; (2) if we really want to expose only lists of keyword as the API,
523 ;; cache a bitmap-indexed vector of all the combinations (sharing tails)
524 (flet ((choose-poll-flags (type)
525 (ecase type
526 (:read (logior et:pollin et::pollrdhup et:pollpri))
527 (:write (logior et:pollout et:pollhup))
528 (:read-write (logior et:pollin et::pollrdhup et:pollpri
529 et:pollout et:pollhup)))))
530 (let ((status ()))
531 (with-foreign-object (pollfd 'et:pollfd)
532 (et:bzero pollfd et:size-of-pollfd)
533 (with-foreign-slots ((et:fd et:events et:revents) pollfd et:pollfd)
534 (setf et:fd fd
535 et:events (choose-poll-flags event-type))
536 (handler-case
537 (let ((ret (et:repeat-upon-eintr
538 (et:poll pollfd 1 (timeout->milisec timeout)))))
539 (when (zerop ret)
540 (return-from wait-until-fd-ready '(:timeout))))
541 (et:unix-error (err)
542 (declare (ignore err))
543 (return-from wait-until-fd-ready '(:error))))
544 (flags-case et:revents
545 ((et:pollout et:pollhup) (push :write status))
546 ((et:pollin et::pollrdhup et:pollpri) (push :read status))
547 ((et:pollerr et:pollnval) (push :error status)))
548 (return-from wait-until-fd-ready status))))))
550 (defun fd-ready-p (fd &optional (event-type :read))
551 (not (member :timeout (wait-until-fd-ready fd event-type 0) :test #'eq)))