From bc85f4928bc19db379af4802d23050ed2c6c7cef Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Tue, 15 Jul 2008 10:08:52 +0200 Subject: [PATCH] Add write buffering, code cleanup. Signed-off-by: Stelian Ionescu --- io.streams/zeta/buffer.lisp | 119 ++++++++++++++++++++++++++++-------------- io.streams/zeta/classes.lisp | 14 ++--- io.streams/zeta/common.lisp | 64 +++++++++-------------- io.streams/zeta/internal.lisp | 31 ++++++++--- 4 files changed, 134 insertions(+), 94 deletions(-) diff --git a/io.streams/zeta/buffer.lisp b/io.streams/zeta/buffer.lisp index bd56516..8069783 100644 --- a/io.streams/zeta/buffer.lisp +++ b/io.streams/zeta/buffer.lisp @@ -5,71 +5,112 @@ (in-package :io.zeta-streams) -(defclass filter (dual-channel-device) ()) +;;;----------------------------------------------------------------------------- +;;; Classes +;;;----------------------------------------------------------------------------- -(defclass device-buffer (filter) - ((input-buffer :initarg :input-buffer :accessor input-buffer-of) +(defclass buffer (device) + ((single-channel :initarg :single-channel :accessor single-channel-buffer-p) + (input-buffer :initarg :input-buffer :accessor input-buffer-of) (output-buffer :initarg :output-buffer :accessor output-buffer-of))) + + +;;;----------------------------------------------------------------------------- +;;; Constructors +;;;----------------------------------------------------------------------------- -(defmethod initialize-instance :after ((filter filter) &key +(defmethod initialize-instance :after ((buffer buffer) &key single-channel input-buffer-size output-buffer-size) - (if (input-buffer-of filter) - (check-type (input-buffer-of filter) iobuf) - (setf (input-buffer-of filter) (make-iobuf input-buffer-size))) - (if (output-buffer-of filter) - (check-type (output-buffer-of filter) iobuf) - (setf (output-buffer-of filter) (make-iobuf output-buffer-size)))) + (if (input-buffer-of buffer) + (check-type (input-buffer-of buffer) iobuf) + (setf (input-buffer-of buffer) (make-iobuf input-buffer-size))) + (unless single-channel + (if (output-buffer-of buffer) + (check-type (output-buffer-of buffer) iobuf) + (setf (output-buffer-of buffer) (make-iobuf output-buffer-size))))) + + +;;;----------------------------------------------------------------------------- +;;; Generic functions +;;;----------------------------------------------------------------------------- + +(defgeneric buffer-clear-input (buffer)) + +(defgeneric buffer-clear-output (buffer)) + +(defgeneric buffer-flush-output (buffer &optional timeout)) ;;;----------------------------------------------------------------------------- ;;; Buffered DEVICE-READ ;;;----------------------------------------------------------------------------- -(defmethod device-read ((device device-buffer) buffer start end &optional (timeout nil timeoutp)) +(defmethod device-read ((device buffer) buffer start end &optional timeout) (when (= start end) (return-from device-read 0)) - (let* ((timeout (if timeoutp timeout (input-timeout-of (input-handle-of device)))) - (nbytes (read-octets/buffered (input-handle-of device) buffer start end timeout))) - (cond - ((eql :eof nbytes) (return-from device-read :eof)) - ((plusp nbytes) (incf (device-position device) nbytes))) - (values nbytes))) + (read-octets/buffered (input-handle-of device) buffer start end timeout)) -(defun fill-input-buffer (input-handle input-buffer timeout) - (declare (type device input-handle) - (type iobuf input-buffer) - (type device-timeout timeout)) - (device-read input-handle (iobuf-data input-buffer) - (iobuf-end input-buffer) (iobuf-size input-buffer) - timeout)) - -(defun read-octets/buffered (device buffer start end timeout) - (declare (type device-buffer device) - (type iobuf-buffer buffer) +(defun read-octets/buffered (buffer array start end timeout) + (declare (type buffer buffer) + (type iobuf-data-array array) (type iobuf-index start end) (type device-timeout timeout)) (with-accessors ((input-handle input-handle-of) (input-buffer input-buffer-of)) - device + buffer (cond ((iobuf-empty-p input-buffer) - (iobuf-reset input-buffer) (let ((nbytes (fill-input-buffer input-handle input-buffer timeout))) (if (iobuf-empty-p input-buffer) (if (eql :eof nbytes) :eof 0) - (iobuf->array buffer input-buffer start end)))) + (iobuf->array input-buffer array start end)))) (t - (iobuf->array buffer input-buffer start end))))) + (iobuf->array input-buffer array start end))))) + +(defun fill-input-buffer (input-handle input-buffer timeout) + (multiple-value-bind (data start end) + (iobuf-next-empty-zone input-buffer) + (device-read input-handle data start end timeout))) ;;;----------------------------------------------------------------------------- ;;; Buffered DEVICE-WRITE ;;;----------------------------------------------------------------------------- -(defmethod device-write ((device device-buffer) buffer start end &optional (timeout nil timeoutp)) +(defmethod device-write ((device buffer) buffer start end &optional timeout) (when (= start end) (return-from device-write 0)) - (let* ((timeout (if timeoutp timeout (output-timeout-of (output-handle-of device)))) - (nbytes (write-octets/buffered (output-handle-of device) buffer start end timeout))) - (cond - ((eql :eof nbytes) (return-from device-write :eof)) - ((plusp nbytes) (incf (device-position device) nbytes))) - (values nbytes))) + (write-octets/buffered (output-handle-of device) buffer start end timeout)) + +(defun write-octets/buffered (buffer array start end timeout) + (declare (type buffer buffer) + (type iobuf-data-array array) + (type iobuf-index start end) + (type device-timeout timeout)) + (with-accessors ((output-handle output-handle-of) + (output-buffer output-buffer-of)) + buffer + (array->iobuf output-buffer array start end) + (when (iobuf-full-p output-buffer) + (flush-output-buffer output-handle output-buffer timeout)))) + +(defun flush-output-buffer (output-handle output-buffer timeout) + (multiple-value-bind (data start end) + (iobuf-next-data-zone output-buffer) + (device-write output-handle data start end timeout))) + + +;;;----------------------------------------------------------------------------- +;;; Buffer cleaning +;;;----------------------------------------------------------------------------- + +(defmethod buffer-clear-input ((buffer buffer)) + (iobuf-reset (input-buffer-of buffer))) + +(defmethod buffer-clear-output ((buffer buffer)) + (iobuf-reset (output-buffer-of buffer))) + +(defmethod buffer-flush-output ((buffer buffer) &optional timeout) + (with-accessors ((output-handle output-handle-of) + (output-buffer output-buffer-of)) + buffer + (flush-output-buffer output-handle output-buffer timeout) + (iobuf-available-octets output-buffer))) diff --git a/io.streams/zeta/classes.lisp b/io.streams/zeta/classes.lisp index 1a0f561..8bb5333 100644 --- a/io.streams/zeta/classes.lisp +++ b/io.streams/zeta/classes.lisp @@ -36,9 +36,7 @@ (defclass device () ((input-handle :initarg :input-handle :accessor input-handle-of) - (input-timeout :initarg :input-timeout :accessor input-timeout-of) - (output-handle :initarg :output-handle :accessor output-handle-of) - (output-timeout :initarg :output-timeout :accessor output-timeout-of)) + (output-handle :initarg :output-handle :accessor output-handle-of)) (:default-initargs :input-timeout nil :output-timeout nil)) @@ -74,15 +72,9 @@ (defgeneric device-close (device)) -(defgeneric device-read (device buffer start end &optional timeout)) +(defgeneric device-read (device array start end &optional timeout)) -(defgeneric device-write (device buffer start end &optional timeout)) - -(defgeneric device-clear-input (device)) - -(defgeneric device-clear-output (device)) - -(defgeneric device-flush-output (device &optional timeout)) +(defgeneric device-write (device array start end &optional timeout)) (defgeneric device-position (device)) diff --git a/io.streams/zeta/common.lisp b/io.streams/zeta/common.lisp index ddab701..a4e15ce 100644 --- a/io.streams/zeta/common.lisp +++ b/io.streams/zeta/common.lisp @@ -9,16 +9,6 @@ ;;; Default no-op methods ;;;----------------------------------------------------------------------------- -(defmethod device-clear-input ((device device)) - (values device)) - -(defmethod device-clear-output ((device device)) - (values device)) - -(defmethod device-flush-output ((device device) &optional timeout) - (declare (ignore timeout)) - (values device)) - (defmethod device-position ((device device)) (values nil)) @@ -52,22 +42,22 @@ ;;; Default DEVICE-READ ;;;----------------------------------------------------------------------------- -(defmethod device-read ((device device) buffer start end &optional (timeout nil timeoutp)) +(defmethod device-read ((device device) array start end &optional timeout) (when (= start end) (return-from device-read 0)) - (let* ((timeout (if timeoutp timeout (input-timeout-of device))) - (nbytes (if (and timeout (zerop timeout)) - (read-octets/non-blocking (input-handle-of device) buffer start end) - (read-octets/timeout (input-handle-of device) buffer start end timeout)))) + (let ((nbytes (if (and timeout (zerop timeout)) + (read-octets/non-blocking (input-handle-of device) array start end) + (read-octets/timeout (input-handle-of device) array start end timeout)))) (cond ((eql :eof nbytes) (return-from device-read :eof)) - ((plusp nbytes) (incf (device-position device) nbytes))) + ((and (plusp nbytes) (typep device 'single-channel-device)) + (incf (device-position device) nbytes))) (values nbytes))) -(defun read-octets/non-blocking (input-handle buffer start end) +(defun read-octets/non-blocking (input-handle array start end) (declare (type unsigned-byte input-handle) - (type iobuf-buffer buffer) + (type iobuf-data-array array) (type iobuf-index start end)) - (with-pointer-to-vector-data (buf buffer) + (with-pointer-to-vector-data (buf array) (handler-case (nix:repeat-upon-eintr (nix:read input-handle (inc-pointer buf start) (- end start))) @@ -75,12 +65,12 @@ (:no-error (nbytes) (if (zerop nbytes) :eof nbytes))))) -(defun read-octets/timeout (input-handle buffer start end timeout) +(defun read-octets/timeout (input-handle array start end timeout) (declare (type unsigned-byte input-handle) - (type iobuf-buffer buffer) + (type iobuf-data-array array) (type iobuf-index start end) (type device-timeout timeout)) - (with-pointer-to-vector-data (buf buffer) + (with-pointer-to-vector-data (buf array) (nix:repeat-decreasing-timeout (remaining timeout :rloop) (flet ((check-timeout () (if (plusp remaining) @@ -98,35 +88,34 @@ ;;; Default DEVICE-WRITE ;;;----------------------------------------------------------------------------- -(defmethod device-write ((device device) buffer start end &optional (timeout nil timeoutp)) +(defmethod device-write ((device device) array start end &optional timeout) (when (= start end) (return-from device-write 0)) - (let* ((timeout (if timeoutp timeout (output-timeout-of device))) - (nbytes (if (and timeout (zerop timeout)) - (write-octets/non-blocking (output-handle-of device) buffer start end) - (write-octets/timeout (output-handle-of device) buffer start end timeout)))) + (let ((nbytes (if (and timeout (zerop timeout)) + (write-octets/non-blocking (output-handle-of device) array start end) + (write-octets/timeout (output-handle-of device) array start end timeout)))) (cond ((eql :eof nbytes) (return-from device-write :eof)) - ((plusp nbytes) (incf (device-position device) nbytes))) + ((and (plusp nbytes) (typep device 'single-channel-device)) + (incf (device-position device) nbytes))) (values nbytes))) -(defun write-octets/non-blocking (output-handle buffer start end) +(defun write-octets/non-blocking (output-handle array start end) (declare (type unsigned-byte output-handle) - (type iobuf-buffer buffer) + (type iobuf-data-array array) (type iobuf-index start end)) - (with-pointer-to-vector-data (buf buffer) + (with-pointer-to-vector-data (buf array) (handler-case (osicat-posix:repeat-upon-eintr (nix:write output-handle (inc-pointer buf start) (- end start))) (nix:ewouldblock () 0) - (:no-error (nbytes) - (if (zerop nbytes) :eof nbytes))))) + (nix:epipe () :eof)))) -(defun write-octets/timeout (output-handle buffer start end timeout) +(defun write-octets/timeout (output-handle array start end timeout) (declare (type unsigned-byte output-handle) - (type iobuf-buffer buffer) + (type iobuf-data-array array) (type iobuf-index start end) (type device-timeout timeout)) - (with-pointer-to-vector-data (buf buffer) + (with-pointer-to-vector-data (buf array) (nix:repeat-decreasing-timeout (remaining timeout :rloop) (flet ((check-timeout () (if (plusp remaining) @@ -136,5 +125,4 @@ (nix:write output-handle (inc-pointer buf start) (- end start)) (nix:eintr () (check-timeout)) (nix:ewouldblock () (check-timeout)) - (:no-error (nbytes) - (if (zerop nbytes) :eof nbytes))))))) + (nix:epipe () :eof)))))) diff --git a/io.streams/zeta/internal.lisp b/io.streams/zeta/internal.lisp index dba4183..9b02c13 100644 --- a/io.streams/zeta/internal.lisp +++ b/io.streams/zeta/internal.lisp @@ -16,12 +16,12 @@ (deftype iobuf-index () '(unsigned-byte 27)) (deftype iobuf-length () '(integer 0 #.(expt 2 27))) -(deftype iobuf-buffer () 'ub8-sarray) +(deftype iobuf-data-array () 'ub8-sarray) (defparameter *empty-array* (make-array 0 :element-type 'ub8)) (defstruct (iobuf (:constructor %make-iobuf ())) - (data *empty-array* :type iobuf-buffer) + (data *empty-array* :type iobuf-data-array) (start 0 :type iobuf-index) (end 0 :type iobuf-index)) @@ -47,10 +47,25 @@ (= (iobuf-start iobuf) (iobuf-end iobuf))) +(defun iobuf-full-p (iobuf) + (declare (type iobuf iobuf)) + (= (iobuf-end iobuf) + (iobuf-size iobuf))) + (defun iobuf-reset (iobuf) (declare (type iobuf iobuf)) (setf (iobuf-start iobuf) 0 (iobuf-end iobuf) 0)) + +(defun iobuf-next-data-zone (iobuf) + (values (iobuf-data iobuf) + (iobuf-start iobuf) + (iobuf-end iobuf))) + +(defun iobuf-next-empty-zone (iobuf) + (values (iobuf-data iobuf) + (iobuf-end iobuf) + (iobuf-size iobuf))) ;;; @@ -83,7 +98,7 @@ (incf (iobuf-end iobuf))))) (defun replace-ub8 (destination source start1 end1 start2 end2) - (declare (type iobuf-buffer destination source) + (declare (type iobuf-data-array destination source) (type iobuf-index start1 start2 end1 end2)) (let ((nbytes (min (- end1 start1) (- end2 start2)))) @@ -92,10 +107,12 @@ :start2 start2 :end2 end2) (values destination nbytes))) -(defun iobuf->array (array iobuf start end) - (declare (type iobuf-buffer array) +(defun iobuf->array (iobuf array start end) + (declare (type iobuf-data-array array) (type iobuf iobuf) (type iobuf-index start end)) + (when (iobuf-empty-p iobuf) + (iobuf-reset iobuf)) (let ((nbytes (nth-value 1 (replace-ub8 array (iobuf-data iobuf) start end @@ -105,9 +122,11 @@ (values nbytes))) (defun array->iobuf (iobuf array start end) - (declare (type iobuf-buffer array) + (declare (type iobuf-data-array array) (type iobuf iobuf) (type iobuf-index start end)) + (when (iobuf-empty-p iobuf) + (iobuf-reset iobuf)) (let ((nbytes (nth-value 1 (replace-ub8 (iobuf-data iobuf) array (iobuf-start iobuf) -- 2.11.4.GIT