From 0269784d5f870d629d9d7aec74efcbb2231ae036 Mon Sep 17 00:00:00 2001 From: Vitaly Mayatskikh Date: Fri, 24 Apr 2009 14:50:53 +0200 Subject: [PATCH] Serialize access to node from multiple threads. Signed-off-by: Vitaly Mayatskikh --- cluster.lisp | 96 +++++++++++++++++++++++++++++++++++++++--------------------- package.lisp | 2 +- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/cluster.lisp b/cluster.lisp index 86db871..19514e5 100644 --- a/cluster.lisp +++ b/cluster.lisp @@ -36,7 +36,8 @@ (process :initform nil :accessor node-process) (input :initform nil :accessor node-input) (output :initform nil :accessor node-output) - (sexp :initform nil :accessor node-sexp))) + (sexp :initform nil :accessor node-sexp) + (lock :initform (make-mutex) :accessor node-lock))) (defmethod print-object ((object node) stream) (format stream "#N" (node-host object) (node-lisp object))) @@ -52,29 +53,30 @@ (:documentation "Establish connection with remote host.")) (defmethod node-connect ((object node)) - (with-slots (host lisp process input output) object - (when (not (sb-ext:process-p process)) - (when (= 0 (system *ping* (format nil "-c 1 -w 1 ~a" host))) - (setq process - (remote *ssh* (format nil "-l ~a ~a ~a" - *lisp-user* host lisp)) - input (sb-ext:process-input process) - output (sb-ext:process-output process)) - (let ((proc process)) - (sb-ext:finalize object - (lambda () ;(format t "finalize ~a~%" proc) - (when proc (sb-ext:process-close proc)))))) - (node-flush object t) ; discard prompt and other trash - process))) - -(defmethod initialize-instance :after ((instance node) &key connect &allow-other-keys) + (with-mutex ((node-mutex object)) + (with-slots (host lisp process input output) object + (when (not (sb-ext:process-p process)) + (when (= 0 (system *ping* (format nil "-c 1 -w 1 ~a" host))) + (setq process + (remote *ssh* (format nil "-l ~a ~a ~a" + *lisp-user* host lisp)) + input (sb-ext:process-input process) + output (sb-ext:process-output process)) + (let ((proc process)) + (sb-ext:finalize object + (lambda () ;(format t "finalize ~a~%" proc) + (when proc (sb-ext:process-close proc)))))) + (node-flush object t) ; discard prompt and other trash + process)))) + +(defmethod initialize-instance :after ((object node) &key connect &allow-other-keys) (when connect - (node-connect instance))) + (node-connect object))) -(defgeneric node-send (object msg) +(defgeneric node-send/unsafe (object msg) (:documentation "Send command to remote host.")) -(defmethod node-send ((object node) msg) +(defmethod node-send/unsafe ((object node) msg) (when (not (node-alive-p object)) (error (mkstr "Node " object " not connected"))) (with-slots (input) object @@ -82,12 +84,19 @@ (princ #\Newline input) (force-output input))) -(defgeneric node-recv (object &optional non-blocking) +(defgeneric node-send (object msg) + (:documentation "Send command to remote host (thread-safe).")) + +(defmethod node-send ((object node) msg) + (with-mutex ((node-lock object)) + (node-send/unsafe object msg))) + +(defgeneric node-recv/unsafe (object &optional non-blocking) (:documentation "Receive data from remote host. If optional argument `non-blocking' is set, don't wait for data.")) -(defmethod node-recv ((object node) &optional non-blocking) +(defmethod node-recv/unsafe ((object node) &optional non-blocking) (when (not (node-alive-p object)) (error (mkstr "Node " object " not connected"))) (with-slots (output) object @@ -96,12 +105,21 @@ data.")) (read output)) (read output)))) -(defgeneric node-flush (object &optional wait-input) +(defgeneric node-recv (object &optional non-blocking) + (:documentation "Receive data from remote host (thread-safe). +If optional argument `non-blocking' is set, don't wait for +data.")) + +(defmethod node-recv ((object node) &optional non-blocking) + (with-mutex ((node-lock object)) + (node-recv/unsafe object non-blocking))) + +(defgeneric node-flush/unsafe (object &optional wait-input) (:documentation "Flush available input data. Wait for input when optional argument `wait-input' is set. This is useful to skip interactive prompt.")) -(defmethod node-flush ((object node) &optional wait-input) +(defmethod node-flush/unsafe ((object node) &optional wait-input) (when wait-input (with-slots (output) object (let ((timeout 60.0)) @@ -109,20 +127,30 @@ This is useful to skip interactive prompt.")) (sleep 0.25))))) (clear-input (node-output object))) +(defgeneric node-flush (object &optional wait-input) + (:documentation "Flush available input data (thread-safe). +Wait for input when optional argument `wait-input' is set. +This is useful to skip interactive prompt.")) + +(defmethod node-flush ((object node) &optional wait-input) + (with-mutex ((node-lock object)) + (node-flush/unsafe object wait-input))) + (defgeneric node-exec (object cmd &optional trap-errors) (:documentation "Execute command on remote host and return result.")) (defmethod node-exec ((object node) cmd &optional (trap-errors t)) - (node-flush object) - (setf (node-sexp object) cmd) - (when trap-errors - (setq cmd (mkstr "(handler-case " cmd - " (error (condition) (list 'error (format nil \"~a\" condition))))"))) - (node-send object cmd) - (let ((answer (node-recv object))) - (when (and (listp answer) trap-errors (eq (car answer) 'error)) - (error "Error: ~a~%In form: ~a~%At node: ~a~%" (cadr answer) (node-sexp object) object)) - answer)) + (with-mutex ((node-lock object)) + (node-flush/unsafe object) + (setf (node-sexp object) cmd) + (when trap-errors + (setq cmd (mkstr "(handler-case " cmd + " (error (condition) (list 'error (format nil \"~a\" condition))))"))) + (node-send/unsafe object cmd) + (let ((answer (node-recv/unsafe object))) + (when (and (listp answer) trap-errors (eq (car answer) 'error)) + (error "Error: ~a~%In form: ~a~%At node: ~a~%" (cadr answer) (node-sexp object) object)) + answer))) (defmacro with-remote (node &body body) "Execute body at remote host." diff --git a/package.lisp b/package.lisp index 53e1c46..4ed5bf9 100644 --- a/package.lisp +++ b/package.lisp @@ -16,7 +16,7 @@ ;; along with this program. If not, see . (defpackage #:cl-cluster - (:use :cl :split-sequence :cl-pmap :cl-lol) + (:use :cl :sb-thread :split-sequence :cl-pmap :cl-lol) (:export :node :node-host :node-lisp -- 2.11.4.GIT