From 4cf95143cdf0f8735427e3cc0b35e81939624453 Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Wed, 17 Jan 2007 16:14:46 +0100 Subject: [PATCH] Further improvement of the multiplexers. --- io-multiplex/common.lisp | 53 +++++++++++++++++++-------- io-multiplex/epoll.lisp | 95 ++++++++++++++++++++++++++++++------------------ io-multiplex/select.lisp | 55 +++++++++++++--------------- 3 files changed, 124 insertions(+), 79 deletions(-) diff --git a/io-multiplex/common.lisp b/io-multiplex/common.lisp index 4c80f54..efca047 100644 --- a/io-multiplex/common.lisp +++ b/io-multiplex/common.lisp @@ -93,19 +93,20 @@ (gethash fd (fd-entries mux)))) (defgeneric monitor-fd (mux fd-entry) - (:method-combination progn :most-specific-last)) + (:method ((mux multiplexer) fd-entry) + t)) (defgeneric update-fd (mux fd-entry) - (:method-combination progn :most-specific-last) - (:method progn ((mux multiplexer) fd-entry) + (:method ((mux multiplexer) fd-entry) + t)) + +(defgeneric unmonitor-fd (mux fd &key) + (:method ((mux multiplexer) fd &key) t)) (defgeneric add-fd-handler (mux fd event-type function) (:method-combination progn :most-specific-last)) -(defgeneric unmonitor-fd (mux fd) - (:method-combination progn :most-specific-first)) - (defgeneric remove-fd-handler (mux fd handler) (:method-combination progn :most-specific-first)) @@ -142,11 +143,21 @@ ;;;; Base methods ;;;; -(defmethod monitor-fd progn ((mux multiplexer) fd-entry) +(defmethod monitor-fd :around ((mux multiplexer) fd-entry) (let ((fd (fd-entry-fd fd-entry))) (setf (gethash fd (fd-entries mux)) fd-entry) + (unless (call-next-method) + (unmonitor-fd fd :base-only t) + (error "FD monitoring failed.")) (values fd))) +(defmethod unmonitor-fd :around ((mux multiplexer) fd &key base-only) + (remhash fd (fd-entries mux)) + (unless base-only + (unless (call-next-method) + (error "FD unmonitoring failed."))) + (values fd)) + (defmethod add-fd-handler progn ((mux multiplexer) fd event-type function) (check-type event-type event-type) @@ -154,17 +165,15 @@ (let ((current-entry (fd-entry mux fd)) (handler (make-handler event-type function))) (if current-entry - (push handler (fd-entry-handler-list current-entry event-type)) + (progn + (push handler (fd-entry-handler-list current-entry event-type)) + (update-fd mux current-entry)) (progn (setf current-entry (make-fd-entry fd nil nil nil nil)) (push handler (fd-entry-handler-list current-entry event-type)) (monitor-fd mux current-entry))) (values handler))) -(defmethod unmonitor-fd progn ((mux multiplexer) fd) - (remhash fd (fd-entries mux)) - (values fd)) - (defmethod remove-fd-handler progn ((mux multiplexer) fd handler) (check-type (handler-event-type handler) event-type) @@ -174,8 +183,9 @@ (when current-entry (setf (fd-entry-handler-list current-entry event-type) (delete handler (fd-entry-handler-list current-entry event-type) :test 'eq)) - (when (fd-entry-empty-p current-entry) - (unmonitor-fd mux fd)))) + (if (fd-entry-empty-p current-entry) + (unmonitor-fd mux fd) + (update-fd mux current-entry)))) (values mux)) ;; if there are handlers installed save them and restore them at the end @@ -199,7 +209,7 @@ (defun decode-timeout (timeout) (typecase timeout (integer (values timeout 0)) - (null (values 0 0)) + (null nil) (real (multiple-value-bind (q r) (truncate (coerce timeout 'single-float)) (declare (type unsigned-byte q) (single-float r)) @@ -207,6 +217,19 @@ (t (error "Timeout is not a real number or NIL: ~S" timeout)))) +(defun timeout (sec usec) + (when sec (cons sec usec))) + +(defun timeout-sec (timeout) + (car timeout)) + +(defun timeout-usec (timeout) + (cdr timeout)) + +(defmethod serve-fd-events :around ((mux multiplexer) &key timeout) + (multiple-value-bind (sec usec) (decode-timeout timeout) + (call-next-method mux :timeout (timeout sec usec)))) + (defun wait-until-fd-usable (mux fd event-type &optional timeout) (let (status) (flet ((callback (fd type) diff --git a/io-multiplex/epoll.lisp b/io-multiplex/epoll.lisp index ea9cacb..599dbc8 100644 --- a/io-multiplex/epoll.lisp +++ b/io-multiplex/epoll.lisp @@ -45,7 +45,7 @@ (if (fd-entry-write-handlers fd-entry) et:epollout 0) (if (fd-entry-except-handlers fd-entry) et:epollpri 0))) -(defmethod monitor-fd progn ((mux epoll-multiplexer) fd-entry) +(defmethod monitor-fd ((mux epoll-multiplexer) fd-entry) (assert fd-entry) (let ((flags (calc-epoll-flags fd-entry)) (fd (fd-entry-fd fd-entry))) @@ -55,10 +55,19 @@ (setf (foreign-slot-value (foreign-slot-value ev 'et:epoll-event 'et:data) 'et:epoll-data 'et:fd) fd) - (et:epoll-ctl (epoll-fd mux) et:epoll-ctl-add fd ev)) - (values fd))) + (handler-case + (et:epoll-ctl (epoll-fd mux) et:epoll-ctl-add fd ev) + (et:unix-error-badf (err) + (declare (ignore err)) + (warn "FD ~A is invalid, cannot monitor it." fd) + (return-from monitor-fd nil)) + (et:unix-error-exist (err) + (declare (ignore err)) + (warn "FD ~A is already monitored." fd) + (return-from monitor-fd nil)))) + t)) -(defmethod update-fd progn ((mux epoll-multiplexer) fd-entry) +(defmethod update-fd ((mux epoll-multiplexer) fd-entry) (assert fd-entry) (let ((flags (calc-epoll-flags fd-entry)) (fd (fd-entry-fd fd-entry))) @@ -68,22 +77,38 @@ (setf (foreign-slot-value (foreign-slot-value ev 'et:epoll-event 'et:data) 'et:epoll-data 'et:fd) fd) - (et:epoll-ctl (epoll-fd mux) et:epoll-ctl-mod fd ev)) + (handler-case + (et:epoll-ctl (epoll-fd mux) et:epoll-ctl-mod fd ev) + (et:unix-error-badf (err) + (declare (ignore err)) + (warn "FD ~A is invalid, cannot update its status." fd) + (return-from update-fd nil)) + (et:unix-error-noent (err) + (declare (ignore err)) + (warn "FD ~A was not monitored, cannot update its status." fd) + (return-from update-fd nil)))) (values fd-entry))) -(defmethod unmonitor-fd progn ((mux epoll-multiplexer) fd) - (et:epoll-ctl (epoll-fd mux) - et:epoll-ctl-del - fd - (null-pointer)) - (values fd)) +(defmethod unmonitor-fd ((mux epoll-multiplexer) fd &key) + (handler-case + (et:epoll-ctl (epoll-fd mux) + et:epoll-ctl-del + fd + (null-pointer)) + (et:unix-error-badf (err) + (declare (ignore err)) + (warn "FD ~A is invalid, cannot unmonitor it." fd)) + (et:unix-error-noent (err) + (declare (ignore err)) + (warn "FD ~A was not monitored, cannot unmonitor it." fd))) + t) (defun epoll-serve-single-fd (fd-entry events) (assert fd-entry) - (let ((error-handlers (handler-error-handlers fd-entry)) - (except-handlers (handler-except-handlers fd-entry)) - (read-handlers (handler-read-handlers fd-entry)) - (write-handlers (handler-write-handlers fd-entry)) + (let ((error-handlers (fd-entry-error-handlers fd-entry)) + (except-handlers (fd-entry-except-handlers fd-entry)) + (read-handlers (fd-entry-read-handlers fd-entry)) + (write-handlers (fd-entry-write-handlers fd-entry)) (fd (fd-entry-fd fd-entry))) (when (and error-handlers (logtest et:epollerr events)) (dolist (error-handler error-handlers) @@ -100,26 +125,26 @@ (defmethod serve-fd-events ((mux epoll-multiplexer) &key timeout) - (with-foreign-object (events 'et:epoll-event #.*epoll-max-events*) - (et:memset events 0 #.(* *epoll-max-events* (foreign-type-size 'et:epoll-event))) - (if timeout - (multiple-value-bind - (to-sec to-usec) (decode-timeout timeout) - (setf timeout (+ to-sec (* to-usec 1000)))) - (setf timeout -1)) - (let ((ready-fds - (et:epoll-wait (epoll-fd mux) events - #.*epoll-max-events* timeout))) - (loop - :for i :below ready-fds - :for fd := (foreign-slot-value (foreign-slot-value (mem-aref events 'et:epoll-event i) - 'et:epoll-event 'et:data) - 'et:epoll-data 'et:fd) - :for event-mask := (foreign-slot-value (mem-aref events 'et:epoll-event i) - 'et:epoll-event 'et:events) - :do (epoll-serve-single-fd (fd-entry mux fd) - event-mask)) - (return-from serve-fd-events ready-fds)))) + (let ((milisec-timeout + (if timeout + (+ (* (timeout-sec timeout) 1000) + (truncate (timeout-usec timeout) 1000)) + -1))) + (with-foreign-object (events 'et:epoll-event #.*epoll-max-events*) + (et:memset events 0 #.(* *epoll-max-events* (foreign-type-size 'et:epoll-event))) + (let ((ready-fds + (et:epoll-wait (epoll-fd mux) events + #.*epoll-max-events* milisec-timeout))) + (loop + :for i :below ready-fds + :for fd := (foreign-slot-value (foreign-slot-value (mem-aref events 'et:epoll-event i) + 'et:epoll-event 'et:data) + 'et:epoll-data 'et:fd) + :for event-mask := (foreign-slot-value (mem-aref events 'et:epoll-event i) + 'et:epoll-event 'et:events) + :do (epoll-serve-single-fd (fd-entry mux fd) + event-mask)) + (return-from serve-fd-events ready-fds))))) (defmethod close-multiplexer ((mux epoll-multiplexer)) (cancel-finalization mux) diff --git a/io-multiplex/select.lisp b/io-multiplex/select.lisp index f448b76..f928ca0 100644 --- a/io-multiplex/select.lisp +++ b/io-multiplex/select.lisp @@ -29,7 +29,7 @@ (define-multiplexer select-multiplexer +select-priority+ (multiplexer) ()) -(defun select-setup-masks (select-iface read-fds write-fds except-fds) +(defun select-setup-masks (mux read-fds write-fds except-fds) (declare (type et:foreign-pointer read-fds write-fds except-fds)) @@ -38,7 +38,7 @@ (et:fd-zero except-fds) (let ((max-fd 0)) - (with-hash-table-iterator (next-item (fd-entries select-iface)) + (with-hash-table-iterator (next-item (fd-entries mux)) (multiple-value-bind (item-p fd fd-entry) (next-item) (when item-p (when (> fd max-fd) @@ -51,8 +51,8 @@ (et:fd-set fd except-fds))))) max-fd)) -(defun handle-select-fd-errors (select-iface) - (let ((current-entries (fd-entries select-iface)) +(defun handle-select-fd-errors (mux) + (let ((current-entries (fd-entries mux)) invalid-fd-entries) (with-hash-table-iterator (next-item current-entries) (multiple-value-bind (item-p fd fd-entry) (next-item) @@ -61,10 +61,11 @@ (dolist (fd-entry invalid-fd-entries) (let ((fd (fd-entry-fd fd-entry)) (error-handlers (fd-entry-error-handlers fd-entry))) - (if error-handlers - (dolist (error-handler error-handlers) - (funcall (handler-function error-handler) fd :error)) - (remhash fd current-entries)))))) + (when error-handlers + (dolist (error-handler error-handlers) + (funcall (handler-function error-handler) fd :error))) + (warn "Removing bad FD: ~A from ~A" fd mux) + (unmonitor-fd mux fd :base-only t))))) (defmethod serve-fd-events ((mux select-multiplexer) &key timeout) @@ -92,10 +93,10 @@ (when timeout (progn (et:memset to 0 #.(foreign-type-size 'et:timeval)) - (multiple-value-bind - (to-sec to-usec) (decode-timeout timeout) - (setf (foreign-slot-value to 'et:timeval 'et:tv-sec) to-sec) - (setf (foreign-slot-value to 'et:timeval 'et:tv-usec) to-usec)))) + (setf (foreign-slot-value to 'et:timeval 'et:tv-sec) + (timeout-sec timeout)) + (setf (foreign-slot-value to 'et:timeval 'et:tv-usec) + (timeout-usec timeout)))) (et:select (1+ max-fd) read-fds write-fds @@ -111,21 +112,17 @@ (with-hash-table-iterator (next-item fd-entries) (multiple-value-bind (item-p fd fd-entry) (next-item) (when item-p - (if (fd-open-p fd) - (progn - (incf count) - (when (and (et:fd-isset fd except-fds) - (fd-entry-except-handlers fd-entry)) - (dolist (except-handler (fd-entry-except-handlers fd-entry)) - (funcall (handler-function except-handler) fd :except))) - (when (and (et:fd-isset fd read-fds) - (fd-entry-read-handlers fd-entry)) - (dolist (read-handler (fd-entry-read-handlers fd-entry)) - (funcall (handler-function read-handler) fd :read))) - (when (and (et:fd-isset fd write-fds) - (fd-entry-write-handlers fd-entry)) - (dolist (write-handler (fd-entry-write-handlers fd-entry)) - (funcall (handler-function write-handler) fd :write)))) - ;; TODO: add better error handling - (error "Handler for bad fd is present: ~A " fd))))) + (incf count) + (when (and (et:fd-isset fd except-fds) + (fd-entry-except-handlers fd-entry)) + (dolist (except-handler (fd-entry-except-handlers fd-entry)) + (funcall (handler-function except-handler) fd :except))) + (when (and (et:fd-isset fd read-fds) + (fd-entry-read-handlers fd-entry)) + (dolist (read-handler (fd-entry-read-handlers fd-entry)) + (funcall (handler-function read-handler) fd :read))) + (when (and (et:fd-isset fd write-fds) + (fd-entry-write-handlers fd-entry)) + (dolist (write-handler (fd-entry-write-handlers fd-entry)) + (funcall (handler-function write-handler) fd :write)))))) (return-from serve-fd-events count))))) -- 2.11.4.GIT