From 374aa56af2fd692f072540d2e1d0ce9afd9838b3 Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Tue, 15 Jul 2008 04:02:34 +0200 Subject: [PATCH] Add buffering. Signed-off-by: Stelian Ionescu --- io.streams/zeta/buffer.lisp | 75 +++++++++++++++++++++++++++++++++++++++++++ io.streams/zeta/classes.lisp | 3 ++ io.streams/zeta/common.lisp | 72 ++++++++++++++++++++++------------------- io.streams/zeta/internal.lisp | 74 ++++++++++++++++++++++++++++++------------ io.zeta-streams.asd | 5 ++- 5 files changed, 174 insertions(+), 55 deletions(-) create mode 100644 io.streams/zeta/buffer.lisp diff --git a/io.streams/zeta/buffer.lisp b/io.streams/zeta/buffer.lisp new file mode 100644 index 0000000..bd56516 --- /dev/null +++ b/io.streams/zeta/buffer.lisp @@ -0,0 +1,75 @@ +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; indent-tabs-mode: nil -*- +;;; +;;; --- Device buffers. +;;; + +(in-package :io.zeta-streams) + +(defclass filter (dual-channel-device) ()) + +(defclass device-buffer (filter) + ((input-buffer :initarg :input-buffer :accessor input-buffer-of) + (output-buffer :initarg :output-buffer :accessor output-buffer-of))) + +(defmethod initialize-instance :after ((filter filter) &key + input-buffer-size output-buffer-size) + (if (input-buffer-of filter) + (check-type (input-buffer-of filter) iobuf) + (setf (input-buffer-of filter) (make-iobuf input-buffer-size))) + (if (output-buffer-of filter) + (check-type (output-buffer-of filter) iobuf) + (setf (output-buffer-of filter) (make-iobuf output-buffer-size)))) + + +;;;----------------------------------------------------------------------------- +;;; Buffered DEVICE-READ +;;;----------------------------------------------------------------------------- + +(defmethod device-read ((device device-buffer) buffer start end &optional (timeout nil timeoutp)) + (when (= start end) (return-from device-read 0)) + (let* ((timeout (if timeoutp timeout (input-timeout-of (input-handle-of device)))) + (nbytes (read-octets/buffered (input-handle-of device) buffer start end timeout))) + (cond + ((eql :eof nbytes) (return-from device-read :eof)) + ((plusp nbytes) (incf (device-position device) nbytes))) + (values nbytes))) + +(defun fill-input-buffer (input-handle input-buffer timeout) + (declare (type device input-handle) + (type iobuf input-buffer) + (type device-timeout timeout)) + (device-read input-handle (iobuf-data input-buffer) + (iobuf-end input-buffer) (iobuf-size input-buffer) + timeout)) + +(defun read-octets/buffered (device buffer start end timeout) + (declare (type device-buffer device) + (type iobuf-buffer buffer) + (type iobuf-index start end) + (type device-timeout timeout)) + (with-accessors ((input-handle input-handle-of) + (input-buffer input-buffer-of)) + device + (cond + ((iobuf-empty-p input-buffer) + (iobuf-reset input-buffer) + (let ((nbytes (fill-input-buffer input-handle input-buffer timeout))) + (if (iobuf-empty-p input-buffer) + (if (eql :eof nbytes) :eof 0) + (iobuf->array buffer input-buffer start end)))) + (t + (iobuf->array buffer input-buffer start end))))) + + +;;;----------------------------------------------------------------------------- +;;; Buffered DEVICE-WRITE +;;;----------------------------------------------------------------------------- + +(defmethod device-write ((device device-buffer) buffer start end &optional (timeout nil timeoutp)) + (when (= start end) (return-from device-write 0)) + (let* ((timeout (if timeoutp timeout (output-timeout-of (output-handle-of device)))) + (nbytes (write-octets/buffered (output-handle-of device) buffer start end timeout))) + (cond + ((eql :eof nbytes) (return-from device-write :eof)) + ((plusp nbytes) (incf (device-position device) nbytes))) + (values nbytes))) diff --git a/io.streams/zeta/classes.lisp b/io.streams/zeta/classes.lisp index b67c79f..1a0f561 100644 --- a/io.streams/zeta/classes.lisp +++ b/io.streams/zeta/classes.lisp @@ -23,6 +23,9 @@ (deftype ub16-sarray (&optional (size '*)) `(simple-array ub16 (,size))) + +(deftype device-timeout () + `(or null non-negative-real)) ;;;----------------------------------------------------------------------------- diff --git a/io.streams/zeta/common.lisp b/io.streams/zeta/common.lisp index 7a2423d..ddab701 100644 --- a/io.streams/zeta/common.lisp +++ b/io.streams/zeta/common.lisp @@ -56,35 +56,38 @@ (when (= start end) (return-from device-read 0)) (let* ((timeout (if timeoutp timeout (input-timeout-of device))) (nbytes (if (and timeout (zerop timeout)) - (read-octets-non-blocking (input-handle-of device) buffer start end) - (read-octets-with-timeout (input-handle-of device) buffer start end timeout)))) - (when (plusp nbytes) (incf (device-position device) nbytes)) + (read-octets/non-blocking (input-handle-of device) buffer start end) + (read-octets/timeout (input-handle-of device) buffer start end timeout)))) + (cond + ((eql :eof nbytes) (return-from device-read :eof)) + ((plusp nbytes) (incf (device-position device) nbytes))) (values nbytes))) -(defun read-octets-non-blocking (fd buffer start end) - (declare (type unsigned-byte fd) - (type ub8-sarray buffer) - (type unsigned-byte start end)) +(defun read-octets/non-blocking (input-handle buffer start end) + (declare (type unsigned-byte input-handle) + (type iobuf-buffer buffer) + (type iobuf-index start end)) (with-pointer-to-vector-data (buf buffer) (handler-case (nix:repeat-upon-eintr - (nix:read fd (inc-pointer buf start) (- end start))) + (nix:read input-handle (inc-pointer buf start) (- end start))) (nix:ewouldblock () 0) (:no-error (nbytes) (if (zerop nbytes) :eof nbytes))))) -(defun read-octets-with-timeout (fd buffer start end timeout) - (declare (type unsigned-byte fd) - (type ub8-sarray buffer) - (type unsigned-byte start end)) +(defun read-octets/timeout (input-handle buffer start end timeout) + (declare (type unsigned-byte input-handle) + (type iobuf-buffer buffer) + (type iobuf-index start end) + (type device-timeout timeout)) (with-pointer-to-vector-data (buf buffer) - (nix:repeat-decreasing-timeout (remaining timeout nil) + (nix:repeat-decreasing-timeout (remaining timeout :rloop) (flet ((check-timeout () (if (plusp remaining) - (iomux:wait-until-fd-ready fd :input remaining) - (return 0)))) + (iomux:wait-until-fd-ready input-handle :input remaining) + (return-from :rloop 0)))) (handler-case - (nix:read fd (inc-pointer buf start) (- end start)) + (nix:read input-handle (inc-pointer buf start) (- end start)) (nix:eintr () (check-timeout)) (nix:ewouldblock () (check-timeout)) (:no-error (nbytes) @@ -99,36 +102,39 @@ (when (= start end) (return-from device-write 0)) (let* ((timeout (if timeoutp timeout (output-timeout-of device))) (nbytes (if (and timeout (zerop timeout)) - (write-octets-non-blocking (output-handle-of device) buffer start end) - (write-octets-with-timeout (output-handle-of device) buffer start end timeout)))) - (when (plusp nbytes) (incf (device-position device) nbytes)) + (write-octets/non-blocking (output-handle-of device) buffer start end) + (write-octets/timeout (output-handle-of device) buffer start end timeout)))) + (cond + ((eql :eof nbytes) (return-from device-write :eof)) + ((plusp nbytes) (incf (device-position device) nbytes))) (values nbytes))) -(defun write-octets-non-blocking (fd buffer start end) - (declare (type unsigned-byte fd) - (type ub8-sarray buffer) - (type unsigned-byte start end)) +(defun write-octets/non-blocking (output-handle buffer start end) + (declare (type unsigned-byte output-handle) + (type iobuf-buffer buffer) + (type iobuf-index start end)) (with-pointer-to-vector-data (buf buffer) (handler-case (osicat-posix:repeat-upon-eintr - (nix:write fd (inc-pointer buf start) (- end start))) + (nix:write output-handle (inc-pointer buf start) (- end start))) (nix:ewouldblock () 0) (:no-error (nbytes) (if (zerop nbytes) :eof nbytes))))) -(defun write-octets-with-timeout (fd buffer start end timeout) - (declare (type unsigned-byte fd) - (type ub8-sarray buffer) - (type unsigned-byte start end)) +(defun write-octets/timeout (output-handle buffer start end timeout) + (declare (type unsigned-byte output-handle) + (type iobuf-buffer buffer) + (type iobuf-index start end) + (type device-timeout timeout)) (with-pointer-to-vector-data (buf buffer) - (nix:repeat-decreasing-timeout (remaining timeout nil) + (nix:repeat-decreasing-timeout (remaining timeout :rloop) (flet ((check-timeout () (if (plusp remaining) - (iomux:wait-until-fd-ready fd :output remaining) - (return 0)))) + (iomux:wait-until-fd-ready output-handle :output remaining) + (return-from :rloop 0)))) (handler-case - (nix:write fd (inc-pointer buf start) (- end start)) + (nix:write output-handle (inc-pointer buf start) (- end start)) (nix:eintr () (check-timeout)) (nix:ewouldblock () (check-timeout)) (:no-error (nbytes) - (return (if (zerop nbytes) :eof nbytes)))))))) + (if (zerop nbytes) :eof nbytes))))))) diff --git a/io.streams/zeta/internal.lisp b/io.streams/zeta/internal.lisp index 7459037..dba4183 100644 --- a/io.streams/zeta/internal.lisp +++ b/io.streams/zeta/internal.lisp @@ -5,48 +5,46 @@ (in-package :io.zeta-streams) +(declaim (optimize speed)) + ;;;; Foreign Buffers -(define-constant +bytes-per-iobuf+ 4096) +(define-constant +default-iobuf-size+ 4096) ;;; almost 128 MB: large enough for a stream buffer, ;;; but small enough to fit into a fixnum (deftype iobuf-index () '(unsigned-byte 27)) (deftype iobuf-length () '(integer 0 #.(expt 2 27))) -(deftype iobuf-buffer () '(simple-array ub8 (*))) +(deftype iobuf-buffer () 'ub8-sarray) + +(defparameter *empty-array* (make-array 0 :element-type 'ub8)) (defstruct (iobuf (:constructor %make-iobuf ())) - (data (make-array 0 :element-type 'ub8) - :type iobuf-buffer) - (size 0 :type iobuf-length) + (data *empty-array* :type iobuf-buffer) (start 0 :type iobuf-index) (end 0 :type iobuf-index)) -(defun make-iobuf (&optional (size +bytes-per-iobuf+)) +(defun make-iobuf (&optional size) + (declare (type (or null iobuf-index) size)) (let ((b (%make-iobuf))) - (setf (iobuf-data b) (make-array size :element-type 'ub8) - (iobuf-size b) size) + (setf (iobuf-data b) (make-array (or size +default-iobuf-size+) + :element-type 'ub8 + :initial-element 0)) (values b))) -(defun iobuf-length (iobuf) +(defun iobuf-size (iobuf) (declare (type iobuf iobuf)) - (- (iobuf-end iobuf) - (iobuf-start iobuf))) + (length (iobuf-data iobuf))) -(defun iobuf-empty-p (iobuf) +(defun iobuf-available-octets (iobuf) (declare (type iobuf iobuf)) - (= (iobuf-end iobuf) + (- (iobuf-end iobuf) (iobuf-start iobuf))) -(defun iobuf-full-p (iobuf) - (declare (type iobuf iobuf)) - (= (iobuf-end iobuf) - (iobuf-size iobuf))) - -(defun iobuf-end-space-length (iobuf) +(defun iobuf-empty-p (iobuf) (declare (type iobuf iobuf)) - (- (iobuf-size iobuf) + (= (iobuf-start iobuf) (iobuf-end iobuf))) (defun iobuf-reset (iobuf) @@ -79,7 +77,41 @@ (defun iobuf-push-octet (iobuf octet) (declare (type iobuf iobuf) - (type (unsigned-byte 8) octet)) + (type ub8 octet)) (let ((end (iobuf-end iobuf))) (prog1 (setf (bref iobuf end) octet) (incf (iobuf-end iobuf))))) + +(defun replace-ub8 (destination source start1 end1 start2 end2) + (declare (type iobuf-buffer destination source) + (type iobuf-index start1 start2 end1 end2)) + (let ((nbytes (min (- end1 start1) + (- end2 start2)))) + (replace destination source + :start1 start1 :end1 end1 + :start2 start2 :end2 end2) + (values destination nbytes))) + +(defun iobuf->array (array iobuf start end) + (declare (type iobuf-buffer array) + (type iobuf iobuf) + (type iobuf-index start end)) + (let ((nbytes + (nth-value 1 (replace-ub8 array (iobuf-data iobuf) + start end + (iobuf-start iobuf) + (iobuf-end iobuf))))) + (incf (iobuf-start iobuf) nbytes) + (values nbytes))) + +(defun array->iobuf (iobuf array start end) + (declare (type iobuf-buffer array) + (type iobuf iobuf) + (type iobuf-index start end)) + (let ((nbytes + (nth-value 1 (replace-ub8 (iobuf-data iobuf) array + (iobuf-start iobuf) + (iobuf-end iobuf) + start end)))) + (incf (iobuf-end iobuf) nbytes) + (values nbytes))) diff --git a/io.zeta-streams.asd b/io.zeta-streams.asd index 7631310..898b123 100644 --- a/io.zeta-streams.asd +++ b/io.zeta-streams.asd @@ -19,4 +19,7 @@ (:file "internal" :depends-on ("pkgdcl" "classes" "conditions" "common")) ;; Devices - (:file "file" :depends-on ("pkgdcl" "classes" "conditions" "common" "internal")))) + (:file "file" :depends-on ("pkgdcl" "classes" "conditions" "common" "internal")) + + ;; Buffers + (:file "buffer" :depends-on ("pkgdcl" "classes" "conditions" "common")))) -- 2.11.4.GIT