From f4e54ab4e21decfbeb9771b42e07c3b55f4bbcbb Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Mon, 19 Jan 2009 01:00:37 +0100 Subject: [PATCH] More work on memory streams. --- io.streams/zeta/buffer.lisp | 214 +++++++++++++++++++++++++++++------------ io.streams/zeta/device.lisp | 5 +- io.streams/zeta/file-unix.lisp | 6 +- io.streams/zeta/stream.lisp | 26 +++++ 4 files changed, 189 insertions(+), 62 deletions(-) diff --git a/io.streams/zeta/buffer.lisp b/io.streams/zeta/buffer.lisp index 8e2792a..9475be2 100644 --- a/io.streams/zeta/buffer.lisp +++ b/io.streams/zeta/buffer.lisp @@ -15,8 +15,7 @@ (defclass device-buffer (buffer) ((synchronized :initarg :synchronized :reader %db-synchronized-p) - (device :initform nil - :initarg :device + (device :initarg :device :accessor %db-device) (input-iobuf :initarg :input-buffer :accessor %db-input-iobuf) @@ -34,16 +33,22 @@ ()) (defclass memory-buffer (buffer) - ((data-vector :initarg :data - :accessor %mb-data-vector) + ((data-vector :accessor %mb-data-vector) + (element-type :accessor %mb-element-type) (input-position :initform 0 :accessor %mb-input-position) (output-position :initform 0 :accessor %mb-output-position) - (adjust-size :initarg :adjust-size - :accessor %mb-adjust-size) - (adjust-threshold :initarg :adjust-threshold - :accessor %mb-adjust-threshold))) + (adjust-size :accessor %mb-adjust-size) + (adjust-threshold :accessor %mb-adjust-threshold))) + +(defclass octet-memory-buffer (memory-buffer) + () + (:default-initargs :element-type 'octet)) + +(defclass character-memory-buffer (memory-buffer) + () + (:default-initargs :element-type 'character)) ;;;------------------------------------------------------------------------- @@ -60,9 +65,9 @@ ;;; I/O functions -(defgeneric zstream-read-octet (buffer &key timeout)) +(defgeneric zstream-read-element (buffer &key timeout)) -(defgeneric zstream-write-octet (buffer byte &key timeout)) +(defgeneric zstream-write-element (buffer element &key timeout)) (defgeneric zstream-read-vector (buffer vector &key start end timeout)) @@ -106,7 +111,9 @@ (defgeneric %zstream-clear-output (buffer)) -(defgeneric %ensure-memory-buffer-capacity (buffer amount)) +(defgeneric %ensure-memory-buffer-capacity (buffer &optional amount)) + +(defgeneric %check-memory-buffer-available-data (buffer &optional amount)) ;;;------------------------------------------------------------------------- @@ -116,17 +123,32 @@ (defmethod zstream-synchronized-p ((buffer device-buffer)) (%db-synchronized-p buffer)) +(defmethod zstream-synchronized-p ((buffer memory-buffer)) + (declare (ignore buffer)) + ;; FIXME: signal proper condition + (error "Only device streams are synchronized")) + (defmethod zstream-device ((buffer device-buffer)) (%db-device buffer)) +(defmethod zstream-device ((buffer memory-buffer)) + ;; FIXME: signal proper condition + (error "This is not a device stream: ~S" buffer)) + (defmethod (setf zstream-device) (new-device (buffer device-buffer)) (setf (%db-device buffer) new-device)) + +(defmethod (setf zstream-device) (new-device (buffer memory-buffer)) + (declare (ignore new-device)) + ;; FIXME: signal proper condition + (error "This is not a device stream: ~S" buffer)) ;;;------------------------------------------------------------------------- ;;; Helper macros ;;;------------------------------------------------------------------------- +;; FIXME: synchronize memory buffers too ? (defmacro with-synchronized-buffer ((buffer &optional direction) &body body) (with-gensyms (body-fun) (labels ((make-locks (body direction) @@ -182,12 +204,15 @@ (defmethod shared-initialize :after ((buffer memory-buffer) slot-names - &key data (start 0) end element-type + &key data (start 0) end (element-type t) (adjust-size 1.5) (adjust-threshold 1)) (declare (ignore slot-names)) - ;; FIXME: signal proper condition - (assert (> adjust-size 1)) - (assert (<= adjust-threshold 1)) + (check-type adjust-size (real 1.001)) + (check-type adjust-threshold (real 0.1 1)) + (setf (%mb-adjust-size buffer) adjust-size + (%mb-adjust-threshold buffer) adjust-threshold + (%mb-element-type buffer) (upgraded-array-element-type + element-type)) (cond (data (check-bounds data start end) @@ -209,14 +234,14 @@ ;;; RELINQUISH ;;;------------------------------------------------------------------------- -(defmethod relinquish ((buffer single-channel-buffer) &key abort) +(defmethod relinquish :after ((buffer single-channel-buffer) &key abort) (with-synchronized-buffer (buffer :input) (unless abort (%zstream-flush-output buffer 0)) (relinquish (zstream-device buffer) :abort abort)) (values buffer)) -(defmethod relinquish ((buffer dual-channel-buffer) &key abort) +(defmethod relinquish :after ((buffer dual-channel-buffer) &key abort) (with-synchronized-buffer (buffer :io) (unless abort (%zstream-flush-output buffer 0)) @@ -225,14 +250,21 @@ ;;;------------------------------------------------------------------------- -;;; READ-OCTET +;;; READ-ELEMENT ;;;------------------------------------------------------------------------- -(defmethod zstream-read-octet ((buffer buffer) &key timeout) - (let ((v (make-array 1 :element-type 'ub8))) +(defmethod zstream-read-element ((buffer device-buffer) &key timeout) + (let ((v (make-array 1 :element-type 'octet))) (declare (dynamic-extent v)) (zstream-read-vector buffer v :timeout timeout) (aref v 0))) + +(defmethod zstream-read-element ((buffer memory-buffer) &key timeout) + (declare (ignore timeout)) + (let ((v (make-array 1 :element-type (%mb-element-type buffer)))) + (declare (dynamic-extent v)) + (zstream-read-vector buffer v) + (aref v 0))) ;;;------------------------------------------------------------------------- @@ -255,7 +287,8 @@ (with-synchronized-buffer (buffer :input) (%zstream-read-vector buffer vector start end timeout))) -(defmethod %zstream-read-vector ((buffer device-buffer) vector start end timeout) +(defmethod %zstream-read-vector ((buffer device-buffer) vector + start end timeout) (with-accessors ((input-iobuf %db-input-iobuf)) buffer (cond @@ -266,17 +299,38 @@ (iobuf->vector input-iobuf vector start end)))) (t (iobuf->vector input-iobuf vector start end))))) + +(defmethod zstream-read-vector ((buffer memory-buffer) vector + &key start end timeout) + (declare (ignore timeout)) + (with-accessors ((data-vector %mb-data-vector) + (input-position %mb-input-position) + (output-position %mb-output-position)) + buffer + (%check-memory-buffer-available-data buffer 1) + (replace vector data-vector + :start1 input-position :end1 output-position + :start2 start :end2 end) + (incf input-position (min (- output-position input-position) + (- end start))))) ;;;------------------------------------------------------------------------- -;;; WRITE-OCTET +;;; WRITE-ELEMENT ;;;------------------------------------------------------------------------- -(defmethod zstream-write-octet ((buffer buffer) octet &key timeout) - (check-type octet (unsigned-byte 8)) - (let ((v (make-array 1 :element-type 'ub8 :initial-contents octet))) +(defmethod zstream-write-element ((buffer device-buffer) octet &key timeout) + (check-type octet octet) + (let ((v (make-array 1 :element-type 'octet :initial-contents octet))) (declare (dynamic-extent v)) (zstream-write-vector buffer v :timeout timeout))) + +(defmethod zstream-write-element ((buffer memory-buffer) element &key timeout) + (declare (ignore timeout)) + (let ((v (make-array 1 :element-type (%mb-element-type buffer) + :initial-contents element))) + (declare (dynamic-extent v)) + (zstream-write-vector buffer v))) ;;;------------------------------------------------------------------------- @@ -317,7 +371,6 @@ (defmethod zstream-write-vector ((buffer memory-buffer) vector &key (start 0) end timeout) (declare (ignore timeout)) - (check-bounds vector start end) (with-accessors ((data-vector %mb-data-vector) (output-position %mb-output-position)) buffer @@ -345,29 +398,34 @@ (device-position (zstream-device buffer))) (defmethod zstream-io-position ((buffer memory-buffer)) - (declare (ignore buffer)) - ;; FIXME: signal an error because it has two cursors ? - nil) - -(defmethod (setf zstream-io-position) - (position (buffer single-channel-buffer) &optional (from :start)) - (setf (device-position (zstream-device buffer) from) position)) + ;; FIXME: signal proper error + (error "~S is a dual-cursor stream" buffer)) (defmethod (setf zstream-io-position) - (position (buffer dual-channel-buffer) &optional (from :start)) + (position (buffer device-buffer) &optional (from :start)) (setf (device-position (zstream-device buffer) from) position)) (defmethod (setf zstream-io-position) - (position (buffer dual-channel-buffer) &optional (from :start)) - (declare (ignore position buffer from)) - ;; FIXME: signal an error because it has two cursors ? - nil) + (position (buffer memory-buffer) &optional from) + (declare (ignore position from)) + ;; FIXME: signal proper error + (error "~S is a dual-cursor stream" buffer)) ;;;------------------------------------------------------------------------- ;;; INPUT-POSITION ;;;------------------------------------------------------------------------- +(defmethod zstream-input-position ((buffer device-buffer)) + ;; FIXME: signal proper error + (error "~S is a single-cursor stream" buffer)) + +(defmethod (setf zstream-input-position) + (offset (buffer device-buffer) &optional from) + (declare (ignore offset from)) + ;; FIXME: signal proper error + (error "~S is a single-cursor stream" buffer)) + (defmethod zstream-input-position ((buffer memory-buffer)) (%mb-input-position buffer)) @@ -396,6 +454,16 @@ ;;; OUTPUT-POSITION ;;;------------------------------------------------------------------------- +(defmethod zstream-output-position ((buffer device-buffer)) + ;; FIXME: signal proper error + (error "~S is a single-cursor stream" buffer)) + +(defmethod (setf zstream-output-position) + (offset (buffer device-buffer) &optional from) + (declare (ignore offset from)) + ;; FIXME: signal proper error + (error "~S is a single-cursor stream" buffer)) + (defmethod zstream-output-position ((buffer memory-buffer)) (%mb-output-position buffer)) @@ -411,10 +479,7 @@ (:start offset) (:current (+ output-position offset)) (:input (+ input-position offset))))) - (unless (<= input-position newpos) - ;; FIXME: signal proper condition - (error "Wrong sequence bounds. start: ~S end: ~S" - input-position newpos)) + (check-bounds data-vector input-position newpos) (%ensure-memory-buffer-capacity buffer (- newpos output-position)) (setf output-position newpos)))) @@ -423,7 +488,7 @@ ;;; CLEAR-INPUT ;;;------------------------------------------------------------------------- -(defmethod zstream-clear-input ((buffer single-channel-buffer)) +(defmethod zstream-clear-input ((buffer device-buffer)) (with-synchronized-buffer (buffer :input) (%zstream-clear-input buffer))) @@ -434,19 +499,18 @@ (setf (%buffer-position buffer :current) (- nbytes))) (iobuf-reset (%db-input-iobuf buffer))))) -(defmethod zstream-clear-input ((buffer buffer)) - (with-synchronized-buffer (buffer :input) - (%zstream-clear-input buffer))) - (defmethod %zstream-clear-input ((buffer dual-channel-buffer)) (iobuf-reset (%db-input-iobuf buffer))) + +(defmethod zstream-clear-input ((buffer memory-buffer)) + (setf (%mb-input-position buffer) (%mb-output-position buffer))) ;;;------------------------------------------------------------------------- ;;; CLEAR-OUTPUT ;;;------------------------------------------------------------------------- -(defmethod zstream-clear-output ((buffer single-channel-buffer)) +(defmethod zstream-clear-output ((buffer device-buffer)) (with-synchronized-buffer (buffer :output) (%zstream-clear-output buffer))) @@ -454,9 +518,11 @@ (when (%scb-dirtyp buffer) (iobuf-reset (%db-output-iobuf buffer)))) -(defmethod zstream-clear-output ((buffer dual-channel-buffer)) - (with-synchronized-buffer (buffer :output) - (iobuf-reset (%db-output-iobuf buffer)))) +(defmethod %zstream-clear-output ((buffer dual-channel-buffer)) + (iobuf-reset (%db-output-iobuf buffer))) + +(defmethod zstream-clear-output ((buffer memory-buffer)) + (setf (%mb-output-position buffer) (%mb-input-position buffer))) ;;;------------------------------------------------------------------------- @@ -472,7 +538,7 @@ (with-synchronized-buffer (buffer :input) (%zstream-fill-input buffer timeout))) -(defmethod %zstream-fill-input ((buffer buffer) timeout) +(defmethod %zstream-fill-input ((buffer device-buffer) timeout) (with-accessors ((device zstream-device) (input-iobuf %db-input-iobuf)) buffer @@ -487,6 +553,10 @@ (unsigned-byte (setf (iobuf-end input-iobuf) (+ start nbytes)) (values nbytes (iobuf-available-space input-iobuf)))))))) + +(defmethod zstream-fill-input ((buffer memory-buffer) &key timeout) + (declare (ignore buffer timeout)) + (values nil)) ;;;------------------------------------------------------------------------- @@ -497,7 +567,7 @@ (with-synchronized-buffer (buffer :output) (%zstream-flush-output buffer timeout))) -(defmethod %zstream-flush-output ((buffer buffer) timeout) +(defmethod %zstream-flush-output ((buffer device-buffer) timeout) (with-accessors ((device zstream-device) (output-iobuf %db-output-iobuf)) buffer @@ -518,13 +588,17 @@ (declare (ignore timeout)) (when (iobuf-empty-p (%db-output-iobuf buffer)) (setf (%scb-dirtyp buffer) nil))) + +(defmethod zstream-flush-output ((buffer memory-buffer) &key timeout) + (declare (ignore buffer timeout)) + (values nil)) ;;;------------------------------------------------------------------------- ;;; MEMORY-BUFFER GROW ;;;------------------------------------------------------------------------- -(defmethod %ensure-memory-buffer-capacity ((buffer memory-buffer) amount) +(defmethod %ensure-memory-buffer-capacity ((buffer memory-buffer) &optional (amount 1)) (check-type amount unsigned-byte) (with-accessors ((data-vector %mb-data-vector) (output-position %mb-output-position) @@ -532,16 +606,38 @@ (adjust-threshold %mb-adjust-threshold)) buffer (let* ((size-needed (+ output-position amount)) - (threshold (* adjust-threshold size-needed))) - (when (>= threshold (length data-vector)) + (threshold (ceiling (* adjust-threshold size-needed)))) + (when (> threshold (length data-vector)) (setf data-vector - (adjust-array data-vector (truncate (* adjust-size - size-needed)))))))) + (adjust-array data-vector + (truncate (* adjust-size size-needed)))))))) + +(defmethod %check-memory-buffer-available-data ((buffer memory-buffer) &optional (amount 1)) + (check-type amount positive-integer) + (with-accessors ((input-position %mb-input-position) + (output-position %mb-output-position)) + buffer + (let ((available-data (- output-position input-position))) + (check-type available-data unsigned-byte) + (cond + ((zerop available-data) + (error 'end-of-file :stream buffer)) + ((< available-data amount) + ;; FIXME: signal proper condition, soft EOF + (error "~S elements requested, only ~S available" + amount available-data)))))) ;;;------------------------------------------------------------------------- ;;; I/O WAIT ;;;------------------------------------------------------------------------- -(defmethod zstream-poll ((buffer buffer) &key direction timeout) +(defmethod zstream-poll ((buffer device-buffer) &key direction timeout) (device-poll (zstream-device buffer) direction timeout)) + +(defmethod zstream-poll ((buffer memory-buffer) &key direction timeout) + (declare (ignore timeout)) + (ecase direction + (:input (< (%mb-input-position buffer) + (%mb-output-position buffer))) + (:output t))) diff --git a/io.streams/zeta/device.lisp b/io.streams/zeta/device.lisp index a01bdeb..0fed6ed 100644 --- a/io.streams/zeta/device.lisp +++ b/io.streams/zeta/device.lisp @@ -76,8 +76,10 @@ ;;; Default no-op methods ;;;------------------------------------------------------------------------- +(defmethod relinquish (device &key abort) + (declare (ignore device abort))) + (defmethod device-position ((device device)) - (declare (ignore device)) ;; FIXME: signal proper condition (error "Device not seekable: ~S" device)) @@ -87,7 +89,6 @@ (error "Device not seekable: ~S" device)) (defmethod device-length ((device device)) - (declare (ignore device)) ;; FIXME: signal proper condition (error "Device not seekable: ~S" device)) diff --git a/io.streams/zeta/file-unix.lisp b/io.streams/zeta/file-unix.lisp index a28b121..64a404d 100644 --- a/io.streams/zeta/file-unix.lisp +++ b/io.streams/zeta/file-unix.lisp @@ -145,7 +145,11 @@ ;;;------------------------------------------------------------------------- (defmethod device-poll ((device file-device) direction &optional timeout) - (poll-fd (device-handle device) direction timeout)) + (multiple-value-bind (readp rhupp writep whupp) + (poll-fd (device-handle device) direction timeout) + (ecase direction + (:input (values readp rhupp)) + (:output (values writep whupp))))) ;;;------------------------------------------------------------------------- diff --git a/io.streams/zeta/stream.lisp b/io.streams/zeta/stream.lisp index 7e923c7..3e77c2a 100644 --- a/io.streams/zeta/stream.lisp +++ b/io.streams/zeta/stream.lisp @@ -23,6 +23,8 @@ (defclass memory-zstream (memory-buffer zstream) ()) + +(defclass octet-memory-zstream (memory-zstream) ()) ;;;------------------------------------------------------------------------- @@ -64,6 +66,30 @@ &key (external-format :default)) (declare (ignore slot-names)) (setf (zstream-external-format stream) external-format)) + +(defun make-memory-zstream (&key data (start 0) end (element-type t) + (adjust-size 1.5) (adjust-threshold 1)) + (let ((element-type (upgraded-array-element-type element-type))) + (cond + ((subtypep element-type 'octet) + (make-instance 'octet-memory-zstream + :data data :start start :end end + :adjust-size adjust-size + :adjust-threshold adjust-threshold)) + ((subtypep element-type 'character) + (make-instance 'character-memory-zstream + :data data :start start :end end + :adjust-size adjust-size + :adjust-threshold adjust-threshold)) + ((subtypep element-type 't) + (make-instance 'memory-zstream + :data data :start start :end end + :element-type element-type + :adjust-size adjust-size + :adjust-threshold adjust-threshold)) + (t + (error 'subtype-error :datum element-type + :expected-supertype '(or (unsigned-byte 8) character t)))))) ;;;------------------------------------------------------------------------- -- 2.11.4.GIT