From c6267745fdf829c940737180e7201e608f5149b3 Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Sun, 20 Jul 2008 04:03:02 +0200 Subject: [PATCH] Rewrite buffering. Signed-off-by: Stelian Ionescu --- io.streams/zeta/buffer.lisp | 603 +++++++++++++++++++++++--------------------- io.streams/zeta/device.lisp | 14 +- 2 files changed, 319 insertions(+), 298 deletions(-) rewrite io.streams/zeta/buffer.lisp (75%) diff --git a/io.streams/zeta/buffer.lisp b/io.streams/zeta/buffer.lisp dissimilarity index 75% index c70c690..f65d6c4 100644 --- a/io.streams/zeta/buffer.lisp +++ b/io.streams/zeta/buffer.lisp @@ -1,290 +1,313 @@ -;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*- -;;; -;;; --- Device buffers. -;;; - -(in-package :io.zeta-streams) - -;;;----------------------------------------------------------------------------- -;;; Buffer Classes and Types -;;;----------------------------------------------------------------------------- - -(defclass buffer (device) - ((single-channel-p :initarg :single-channel :accessor single-channel-buffer-p) - (last-io-op :initform nil :accessor last-io-op-of) - (input-buffer :initarg :input-buffer :accessor input-buffer-of) - (output-buffer :initarg :output-buffer :accessor output-buffer-of) - (synchronized :initarg :synchronized :reader buffer-synchronized-p)) - (:default-initargs :input-buffer nil - :output-buffer nil - :synchronized nil)) - - -;;;----------------------------------------------------------------------------- -;;; Buffer Constructors -;;;----------------------------------------------------------------------------- - -(defmethod initialize-instance :after ((buffer buffer) &key - (single-channel nil single-channel-provided) - input-buffer-size output-buffer-size) - (with-accessors ((single-channel-p single-channel-buffer-p) - (input-handle input-handle-of) - (input-buffer input-buffer-of) - (output-buffer output-buffer-of)) - buffer - (setf single-channel-p (if single-channel-provided - single-channel - (typep input-handle 'single-channel-device))) - (if input-buffer - (check-type input-buffer iobuf) - (setf input-buffer (make-iobuf input-buffer-size))) - (if single-channel-p - (setf output-buffer input-buffer) - (cond - (output-buffer - (check-type output-buffer iobuf) - (assert (not (eq input-buffer output-buffer)))) - (t (setf output-buffer (make-iobuf output-buffer-size))))))) - - -;;;----------------------------------------------------------------------------- -;;; Buffer Generic Functions -;;;----------------------------------------------------------------------------- - -(defgeneric buffer-clear-input (buffer)) - -(defgeneric buffer-clear-output (buffer)) - -(defgeneric buffer-flush-output (buffer &optional timeout)) - - -;;;----------------------------------------------------------------------------- -;;; Buffer DEVICE-CLOSE -;;;----------------------------------------------------------------------------- - -(defmethod device-close ((buffer buffer) &optional abort) - (with-accessors ((input-buffer input-buffer-of) - (output-buffer output-buffer-of)) - buffer - (cond - ((single-channel-buffer-p buffer) - (if (buffer-synchronized-p buffer) - (bt:with-lock-held ((iobuf-lock output-buffer)) - (close-single-channel-buffer buffer abort)) - (close-single-channel-buffer buffer abort))) - (t - (if (buffer-synchronized-p buffer) - (bt:with-lock-held ((iobuf-lock input-buffer)) - (bt:with-lock-held ((iobuf-lock output-buffer)) - (close-dual-channel-buffer buffer abort))) - (close-dual-channel-buffer buffer abort))))) - (values buffer)) - -(defun close-single-channel-buffer (buffer abort) - (unless abort - (flush-output-buffer (output-handle-of buffer) - (output-buffer-of buffer) - 0)) - (device-close (output-handle-of buffer))) - -(defun close-dual-channel-buffer (buffer abort) - (unless abort - (flush-output-buffer (output-handle-of buffer) - (output-buffer-of buffer) - 0)) - (device-close (input-handle-of buffer)) - (device-close (output-handle-of buffer))) - -;;;----------------------------------------------------------------------------- -;;; Buffer DEVICE-READ -;;;----------------------------------------------------------------------------- - -(defmethod device-read ((device buffer) buffer start end &optional timeout) - (when (= start end) (return-from device-read 0)) - (cond - ((buffer-synchronized-p device) - (flet ((%read-octets () - (bt:with-lock-held ((iobuf-lock (input-buffer-of device))) - (read-octets/buffered device buffer start end 0)))) - (let ((nbytes (%read-octets))) - (cond - ((and (not (eql timeout 0)) - (eql nbytes 0)) - (wait-for-input (input-handle-of device) timeout) - (%read-octets)) - (t nbytes))))) - (t - (read-octets/buffered device buffer start end timeout)))) - -(defun read-octets/buffered (device vector start end timeout) - (declare (type buffer device) - (type ub8-simple-vector vector) - (type iobuf-index start end) - (type device-timeout timeout)) - (with-accessors ((input-handle input-handle-of) - (input-buffer input-buffer-of) - (output-handle output-handle-of) - (output-buffer output-buffer-of)) - device - ;; If the previous operation was a write, try to flush the output buffer. - ;; If the buffer couldn't be flushed entirely, signal an error - (synchronize-input device output-handle output-buffer) - (cond - ((iobuf-empty-p input-buffer) - (let ((nbytes - (fill-input-buffer device input-handle input-buffer timeout))) - (if (iobuf-empty-p input-buffer) - (if (eql :eof nbytes) :eof 0) - (iobuf->vector input-buffer vector start end)))) - (t - (iobuf->vector input-buffer vector start end))))) - -(defun synchronize-input (device output-handle output-buffer) - (when (and (single-channel-buffer-p device) - (eql :write (last-io-op-of device))) - (if (plusp (flush-output-buffer output-handle output-buffer 0)) - (error "Could not flush the entire write buffer !") - (iobuf-reset output-buffer)))) - -(defun fill-input-buffer (device input-handle input-buffer timeout) - (multiple-value-bind (data start end) - (iobuf-next-empty-zone input-buffer) - (let ((nbytes - (device-read input-handle data start end timeout))) - (setf (iobuf-end input-buffer) (+ start nbytes)) - (setf (last-io-op-of device) :read) - (values nbytes)))) - -(defun flush-input-buffer (input-buffer) - (prog1 - (iobuf-available-octets input-buffer) - (iobuf-reset input-buffer))) - - -;;;----------------------------------------------------------------------------- -;;; Buffer DEVICE-WRITE -;;;----------------------------------------------------------------------------- - -(defmethod device-write ((device buffer) buffer start end &optional timeout) - (when (= start end) (return-from device-write 0)) - (cond - ((buffer-synchronized-p device) - (flet ((%write-octets () - (bt:with-lock-held ((iobuf-lock (output-buffer-of device))) - (write-octets/buffered device buffer start end 0)))) - (let ((nbytes (%write-octets))) - (cond - ((and (not (eql timeout 0)) - (eql nbytes 0)) - (wait-for-output (output-handle-of device) timeout) - (%write-octets)) - (t nbytes))))) - (t - (write-octets/buffered device buffer start end timeout)))) - -(defun write-octets/buffered (device vector start end timeout) - (declare (type buffer device) - (type ub8-simple-vector vector) - (type iobuf-index start end) - (type device-timeout timeout)) - (with-accessors ((output-handle output-handle-of) - (output-buffer output-buffer-of)) - device - ;; If the previous operation was a read, flush the read buffer - ;; and reposition the file offset accordingly - (synchronize-output device) - (prog1 - (vector->iobuf output-buffer vector start end) - (setf (last-io-op-of device) :write) - (when (iobuf-full-p output-buffer) - (flush-output-buffer output-handle output-buffer timeout))))) - -(defun synchronize-output (device) - (when (and (single-channel-buffer-p device) - (eql :read (last-io-op-of device))) - (let ((nbytes (flush-input-buffer (input-buffer-of device)))) - (unless (zerop nbytes) - (setf (device-position device :from :current) (- nbytes)))))) - -(defun flush-output-buffer (output-handle output-buffer timeout) - (multiple-value-bind (data start end) - (iobuf-next-data-zone output-buffer) - (let ((nbytes - (device-write output-handle data start end timeout))) - (setf (iobuf-start output-buffer) (+ start nbytes)))) - (iobuf-available-octets output-buffer)) - - -;;;----------------------------------------------------------------------------- -;;; Buffer DEVICE-POSITION -;;;----------------------------------------------------------------------------- - -(defmethod device-position ((device buffer)) - (if (buffer-synchronized-p device) - (bt:with-lock-held ((iobuf-lock (input-buffer-of device))) - (get-device-position device)) - (get-device-position device))) - -(defun get-device-position (buffer) - (when-let ((single-channel-p (single-channel-buffer-p buffer)) - (handle-position - (device-position (input-handle-of buffer)))) - (ecase (last-io-op-of buffer) - (:read - (- handle-position (iobuf-available-octets (input-buffer-of buffer)))) - (:write - (+ handle-position (iobuf-available-octets (input-buffer-of buffer))))))) - -(defmethod (setf device-position) (position (device buffer) &key (from :start)) - (setf (device-position device :from from) position)) - - -;;;----------------------------------------------------------------------------- -;;; Buffer cleaning -;;;----------------------------------------------------------------------------- - -(defmethod buffer-clear-input ((buffer buffer)) - (with-accessors ((input-buffer input-buffer-of) - (last-io-op last-io-op-of) - (single-channel-p single-channel-buffer-p)) - buffer - (flet ((%buffer-clear-input () - (when (or (not single-channel-p) - (and single-channel-p - (eql :read last-io-op))) - (iobuf-reset input-buffer)))) - (if (buffer-synchronized-p buffer) - (bt:with-lock-held ((iobuf-lock input-buffer)) - (%buffer-clear-input)) - (%buffer-clear-input))))) - -(defmethod buffer-clear-output ((buffer buffer)) - (with-accessors ((output-buffer output-buffer-of) - (last-io-op last-io-op-of) - (single-channel-p single-channel-buffer-p)) - buffer - (flet ((%buffer-clear-output () - (when (or (not single-channel-p) - (and single-channel-p - (eql :write last-io-op))) - (iobuf-reset output-buffer)))) - (if (buffer-synchronized-p buffer) - (bt:with-lock-held ((iobuf-lock output-buffer)) - (%buffer-clear-output)) - (%buffer-clear-output))))) - -(defmethod buffer-flush-output ((buffer buffer) &optional timeout) - (with-accessors ((output-handle output-handle-of) - (output-buffer output-buffer-of) - (last-io-op last-io-op-of) - (single-channel-p single-channel-buffer-p)) - buffer - (flet ((%buffer-flush-output () - (when (or (not single-channel-p) - (and single-channel-p - (eql :write last-io-op))) - (flush-output-buffer output-handle output-buffer timeout)))) - (if (buffer-synchronized-p buffer) - (bt:with-lock-held ((iobuf-lock output-buffer)) - (%buffer-flush-output)) - (%buffer-flush-output))))) +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*- +;;; +;;; --- Device buffers. +;;; + +(in-package :io.zeta-streams) + +;;;----------------------------------------------------------------------------- +;;; Buffer Classes and Types +;;;----------------------------------------------------------------------------- + +(defclass buffer () + ((synchronized :initarg :synchronized :reader buffer-synchronized-p) + (input-iobuf :initarg :input-buffer :accessor input-iobuf-of) + (output-iobuf :initarg :output-buffer :accessor output-iobuf-of)) + (:default-initargs :synchronized nil)) + +(defclass single-channel-buffer (single-channel-device buffer) + ((last-io-op :initform nil :accessor last-io-op-of))) + +(defclass dual-channel-buffer (dual-channel-device buffer) ()) + + +;;;----------------------------------------------------------------------------- +;;; Buffer Generic Functions +;;;----------------------------------------------------------------------------- + +(defgeneric buffer-clear-input (buffer)) + +(defgeneric buffer-clear-output (buffer)) + +(defgeneric buffer-fill-input (buffer &optional timeout)) + +(defgeneric buffer-flush-output (buffer &optional timeout)) + +;;; Internal functions + +(defgeneric buffer-read-octets (buffer vector start end timeout)) + +(defgeneric buffer-write-octets (buffer vector start end timeout)) + +(defgeneric %buffer-clear-input (buffer)) + +(defgeneric %buffer-fill-input (buffer timeout)) + +(defgeneric %buffer-flush-output (buffer timeout)) + + +;;;----------------------------------------------------------------------------- +;;; Helper macros +;;;----------------------------------------------------------------------------- + +(defmacro with-synchronized-single-channel-buffer ((buffer) &body body) + (with-gensyms (body-fun) + `(flet ((,body-fun () ,@body)) + (if (buffer-synchronized-p ,buffer) + (bt:with-lock-held ((iobuf-lock (input-iobuf-of ,buffer))) + (,body-fun)) + (,body-fun))))) + +(defmacro with-synchronized-dual-channel-buffer ((buffer &optional direction) + &body body) + (with-gensyms (body-fun) + (labels ((make-locks (body direction) + (case direction + (:input + `(bt:with-lock-held ((iobuf-lock (input-iobuf-of ,buffer))) + ,body)) + (:output + `(bt:with-lock-held ((iobuf-lock (output-iobuf-of ,buffer))) + ,body)) + (otherwise + (make-locks (make-locks body :output) :input))))) + `(flet ((,body-fun () ,@body)) + (if (buffer-synchronized-p ,buffer) + ,(make-locks `(,body-fun) direction) + (,body-fun)))))) + + +;;;----------------------------------------------------------------------------- +;;; Buffer Constructors +;;;----------------------------------------------------------------------------- + +(defmethod initialize-instance :after + ((device single-channel-buffer) &key buffer buffer-size) + (with-accessors ((input-iobuf input-iobuf-of) + (output-iobuf output-iobuf-of)) + device + (check-type buffer (or null iobuf)) + (setf input-iobuf (or buffer (make-iobuf buffer-size)) + output-iobuf input-iobuf))) + +(defmethod initialize-instance :after + ((device dual-channel-buffer) &key input-buffer output-buffer + input-buffer-size output-buffer-size) + (with-accessors ((input-iobuf input-iobuf-of) + (output-iobuf output-iobuf-of)) + device + (check-type input-buffer (or null iobuf)) + (check-type output-buffer (or null iobuf)) + (setf input-iobuf (or input-buffer (make-iobuf input-buffer-size))) + (setf output-iobuf (or output-buffer (make-iobuf output-buffer-size))))) + + +;;;----------------------------------------------------------------------------- +;;; Buffer DEVICE-CLOSE +;;;----------------------------------------------------------------------------- + +(defmethod device-close ((buffer single-channel-buffer) &optional abort) + (with-accessors ((handle input-handle-of)) + buffer + (with-synchronized-single-channel-buffer (buffer) + (unless (or abort (eql :read (last-io-op-of buffer))) + (%buffer-flush-output buffer 0)) + (device-close handle))) + (values buffer)) + +(defmethod device-close ((buffer buffer) &optional abort) + (with-accessors ((input-handle input-handle-of buffer) + (output-handle output-handle-of buffer)) + buffer + (with-synchronized-dual-channel-buffer (buffer) + (unless abort + (%buffer-flush-output buffer 0)) + (device-close input-handle) + (device-close output-handle))) + (values buffer)) + +;;;----------------------------------------------------------------------------- +;;; Buffer DEVICE-READ +;;;----------------------------------------------------------------------------- + +(defmethod device-read ((buffer single-channel-buffer) vector start end + &optional timeout) + (when (= start end) (return-from device-read 0)) + (with-synchronized-single-channel-buffer (buffer) + ;; If the previous operation was a write, try to flush the output buffer. + ;; If the buffer couldn't be flushed entirely, signal an error + (synchronize-input buffer) + (buffer-read-octets buffer buffer start end timeout))) + +(defmethod device-read ((buffer dual-channel-buffer) vector start end + &optional timeout) + (when (= start end) (return-from device-read 0)) + (with-synchronized-dual-channel-buffer (buffer :input) + (buffer-read-octets buffer buffer start end timeout))) + +(defmethod buffer-read-octets ((buffer buffer) vector start end timeout) + (with-accessors ((input-handle input-handle-of) + (input-iobuf input-iobuf-of) + (output-handle output-handle-of) + (output-iobuf output-iobuf-of)) + buffer + (cond + ((iobuf-empty-p input-iobuf) + (let ((nbytes + (%buffer-fill-input buffer timeout))) + (if (iobuf-empty-p input-iobuf) + (if (eql :eof nbytes) :eof 0) + (iobuf->vector input-iobuf vector start end)))) + (t + (iobuf->vector input-iobuf vector start end))))) + + +;;;----------------------------------------------------------------------------- +;;; Buffer DEVICE-WRITE +;;;----------------------------------------------------------------------------- + +(defmethod device-write ((buffer single-channel-buffer) vector start end + &optional timeout) + (when (= start end) (return-from device-write 0)) + (with-synchronized-single-channel-buffer (buffer) + ;; If the previous operation was a read, flush the read buffer + ;; and reposition the file offset accordingly + (%buffer-clear-input buffer) + (buffer-write-octets buffer vector start end timeout))) + +(defmethod device-write ((buffer dual-channel-buffer) vector start end + &optional timeout) + (when (= start end) (return-from device-write 0)) + (with-synchronized-dual-channel-buffer (buffer :output) + (buffer-write-octets buffer vector start end timeout))) + +(defmethod buffer-write-octets ((buffer buffer) vector start end timeout) + (with-accessors ((output-handle output-handle-of) + (output-iobuf output-iobuf-of)) + buffer + (prog1 + (vector->iobuf output-iobuf vector start end) + (setf (last-io-op-of buffer) :write) + (when (iobuf-full-p output-iobuf) + (%buffer-flush-output buffer timeout))))) + + +;;;----------------------------------------------------------------------------- +;;; Buffer DEVICE-POSITION +;;;----------------------------------------------------------------------------- + +(defmethod device-position ((buffer single-channel-buffer)) + (with-synchronized-single-channel-buffer (buffer) + (%buffer-position buffer))) + +(defun %buffer-position (buffer) + (let ((position (device-position (input-handle-of buffer)))) + (ecase (last-io-op-of buffer) + (:read + (- position (iobuf-available-octets (input-iobuf-of buffer)))) + (:write + (+ position (iobuf-available-octets (output-iobuf-of buffer))))))) + +(defmethod (setf device-position) (position (buffer single-channel-buffer) &key (from :start)) + (setf (%buffer-position buffer from) position)) + +(defun (setf %buffer-position) (position buffer from) + (setf (device-position (input-handle-of buffer) :from from) position)) + + +;;;----------------------------------------------------------------------------- +;;; BUFFER CLEAR-INPUT +;;;----------------------------------------------------------------------------- + +(defmethod buffer-clear-input ((buffer single-channel-buffer)) + (with-synchronized-single-channel-buffer (buffer) + (%buffer-clear-input buffer))) + +(defmethod %buffer-clear-input ((buffer single-channel-buffer)) + (when (eql :read (last-io-op-of buffer)) + (let ((nbytes (iobuf-available-octets (input-iobuf-of buffer)))) + (unless (zerop nbytes) + (setf (%buffer-position buffer :current) (- nbytes))) + (iobuf-reset (input-iobuf-of buffer))))) + +(defmethod buffer-clear-input ((buffer buffer)) + (with-synchronized-dual-channel-buffer (buffer :input) + (%buffer-clear-input buffer))) + +(defmethod %buffer-clear-input ((buffer dual-channel-buffer)) + (iobuf-reset (input-iobuf-of buffer))) + + +;;;----------------------------------------------------------------------------- +;;; BUFFER CLEAR-OUTPUT +;;;----------------------------------------------------------------------------- + +(defmethod buffer-clear-output ((buffer single-channel-buffer)) + (with-synchronized-single-channel-buffer (buffer) + (when (eql :write (last-io-op-of buffer)) + (iobuf-reset (output-iobuf-of buffer))))) + +(defmethod buffer-clear-output ((buffer dual-channel-buffer)) + (with-synchronized-dual-channel-buffer (buffer :output) + (iobuf-reset (output-iobuf-of buffer)))) + + +;;;----------------------------------------------------------------------------- +;;; BUFFER FILL-INPUT +;;;----------------------------------------------------------------------------- + +(defmethod buffer-fill-input ((buffer single-channel-buffer) &optional timeout) + (with-synchronized-single-channel-buffer (buffer) + ;; If the previous operation was a write, try to flush the output buffer. + ;; If the buffer couldn't be flushed entirely, signal an error + (synchronize-input buffer) + (%buffer-fill-input buffer timeout))) + +(defun synchronize-input (buffer) + (when (and (eql :write (last-io-op-of buffer)) + (plusp (%buffer-flush-output buffer 0))) + ;; FIXME: What do we do now ??? + (error "Could not flush the entire write buffer !")) + (iobuf-reset (output-iobuf-of buffer))) + +(defmethod buffer-fill-input ((buffer dual-channel-buffer) &optional timeout) + (with-synchronized-dual-channel-buffer (buffer :input) + (%buffer-fill-input buffer timeout))) + +(defmethod %buffer-fill-input ((buffer buffer) timeout) + (with-accessors ((input-handle input-handle-of) + (input-iobuf input-iobuf-of)) + buffer + (multiple-value-bind (data start end) + (iobuf-next-empty-zone input-iobuf) + (let ((nbytes + (device-read input-handle data start end timeout))) + (setf (iobuf-end input-iobuf) (+ start nbytes)) + (setf (last-io-op-of buffer) :read) + (values nbytes))))) + + +;;;----------------------------------------------------------------------------- +;;; BUFFER FLUSH-OUTPUT +;;;----------------------------------------------------------------------------- + +(defmethod buffer-flush-output ((buffer single-channel-buffer) &optional timeout) + (with-synchronized-single-channel-buffer (buffer) + (when (eql :write (last-io-op-of buffer)) + (%buffer-flush-output buffer timeout)))) + +(defmethod buffer-flush-output ((buffer dual-channel-buffer) &optional timeout) + (with-synchronized-dual-channel-buffer (buffer :output) + (%buffer-flush-output buffer timeout))) + +(defmethod %buffer-flush-output ((buffer dual-channel-buffer) timeout) + (with-accessors ((output-handle output-handle-of) + (output-iobuf output-iobuf-of)) + buffer + (multiple-value-bind (data start end) + (iobuf-next-data-zone output-iobuf) + (let ((nbytes + (device-write output-handle data start end timeout))) + (setf (iobuf-start output-iobuf) (+ start nbytes)) + (setf (last-io-op-of buffer) :write) + (iobuf-available-octets output-iobuf))))) diff --git a/io.streams/zeta/device.lisp b/io.streams/zeta/device.lisp index 93e9fe5..d5f7e63 100644 --- a/io.streams/zeta/device.lisp +++ b/io.streams/zeta/device.lisp @@ -127,10 +127,9 @@ (defmethod device-read ((device device) vector start end &optional timeout) (when (= start end) (return-from device-read 0)) - (with-device (device) - (if (and timeout (zerop timeout)) - (read-octets/non-blocking device vector start end) - (read-octets/timeout device vector start end timeout)))) + (if (and timeout (zerop timeout)) + (read-octets/non-blocking device vector start end) + (read-octets/timeout device vector start end timeout))) (defun read-octets/non-blocking (device vector start end) (declare (type device device) @@ -174,10 +173,9 @@ (defmethod device-write ((device device) vector start end &optional timeout) (when (= start end) (return-from device-write 0)) - (with-device (device) - (if (and timeout (zerop timeout)) - (write-octets/non-blocking device vector start end) - (write-octets/timeout device vector start end timeout)))) + (if (and timeout (zerop timeout)) + (write-octets/non-blocking device vector start end) + (write-octets/timeout device vector start end timeout))) (defun write-octets/non-blocking (device vector start end) (declare (type device device) -- 2.11.4.GIT