1 ;; -*- lexical-binding: t; -*-
4 ;; Copyright (C) 2013 Grégoire Jadi
6 ;; Author: Grégoire Jadi <gregoire.jadi@gmail.com>
8 ;; This program is free software: you can redistribute it and/or
9 ;; modify it under the terms of the GNU General Public License as
10 ;; published by the Free Software Foundation, either version 3 of
11 ;; the License, or (at your option) any later version.
13 ;; This program is distributed in the hope that it will be useful,
14 ;; but WITHOUT ANY WARRANTY; without even the implied warranty of
15 ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 ;; GNU General Public License for more details.
18 ;; You should have received a copy of the GNU General Public License
19 ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
26 (require 'parallel-remote
)
28 (defgroup parallel nil
29 "Execute stuff in parallel"
32 (defcustom parallel-sleep
0.05
33 "How many sec should we wait while polling."
37 (defcustom parallel-config nil
38 "Global config setting to use."
42 (defvar parallel--server nil
)
43 (defvar parallel--tasks nil
)
44 (defvar parallel--tunnels nil
)
46 ;; Declare external function
47 (declare-function parallel-send
"parallel-remote")
49 (defun parallel-make-tunnel (username hostname
)
50 (parallel--init-server)
51 (let ((tunnel (find-if (lambda (tun)
52 (and (string= username
53 (process-get tun
'username
))
55 (process-get tun
'hostname
))))
58 (setq tunnel
(start-process "parallel-ssh" nil
"ssh"
59 "-N" "-R" (format "0:localhost:%s"
60 (process-contact parallel--server
:service
))
61 (format "%s@%s" username hostname
)))
62 (process-put tunnel
'username username
)
63 (process-put tunnel
'hostname hostname
)
64 (set-process-filter tunnel
#'parallel--tunnel-filter
)
65 (while (null (process-get tunnel
'service
))
67 (push tunnel parallel--tunnels
))
70 (defun parallel-stop-tunnel (tunnel)
71 (setq parallel--tunnels
(delq tunnel parallel--tunnels
))
72 (delete-process tunnel
))
74 (defun parallel--tunnel-filter (proc output
)
75 (if (string-match "\\([0-9]+\\)" output
)
76 (process-put proc
'service
(match-string 1 output
))))
78 (defmacro parallel--set-option
(place config
)
79 `(setf ,place
(or ,place
80 (plist-get ,config
,(intern (format ":%s" (symbol-name place
))))
81 (plist-get parallel-config
,(intern (format ":%s" (symbol-name place
)))))))
83 (defmacro parallel--set-options
(config &rest options
)
85 ,@(loop for option in options
86 collect
`(parallel--set-option ,option
,config
))))
88 (defun* parallel-start
(exec-fun &key post-exec env timeout
89 emacs-path library-path emacs-args
90 graphical debug on-event continue-when-executed
91 username hostname hostport
93 (parallel--init-server)
95 ;; Initialize parameters
96 (parallel--set-options config
104 continue-when-executed
109 (setq emacs-path
(or emacs-path
110 (plist-get config
:emacs-path
)
111 (plist-get parallel-config
:emacs-path
)
112 (expand-file-name invocation-name
113 invocation-directory
))
114 library-path
(or library-path
115 (plist-get config
:library-path
)
116 (plist-get parallel-config
:library-path
)
117 (locate-library "parallel-remote")))
119 (let ((task (parallel--new-task))
120 proc tunnel ssh-args
)
121 (push task parallel--tasks
)
122 (put task
'initialized nil
)
123 (put task
'exec-fun exec-fun
)
125 (when (functionp post-exec
)
126 (put task
'post-exec post-exec
))
127 (when (functionp on-event
)
128 (put task
'on-event on-event
))
129 (put task
'results nil
)
130 (put task
'status
'run
)
131 (put task
'queue nil
)
133 ;; We need to get the tunnel if it exists so we can send the right
134 ;; `service' to the remote.
135 (when (and username hostname
)
137 (setq ssh-args
(list "-R" (format "%s:localhost:%s" hostport
138 (process-contact parallel--server
:service
)))
140 (setq tunnel
(parallel-make-tunnel username hostname
)
141 hostport
(process-get tunnel
'service
)))
142 (setq ssh-args
(append
144 (if graphical
(list "-X"))
145 (list (format "%s@%s" username hostname
)))))
146 (setq emacs-args
(remq nil
147 (list* "-Q" "-l" library-path
148 (if graphical nil
"-batch")
149 "--eval" (format "(setq parallel-service '%S)"
152 (process-contact parallel--server
:service
)))
153 "--eval" (format "(setq parallel-task-id '%S)" task
)
154 "--eval" (format "(setq debug-on-error '%S)" debug
)
155 "--eval" (format "(setq parallel-continue-when-executed '%S)" continue-when-executed
)
156 "-f" "parallel-remote--init"
159 ;; Reformat emacs-args if we use a tunnel (escape string)
161 (setq emacs-args
(list (mapconcat (lambda (string)
163 (prin1-to-string string
)
166 (setq proc
(apply #'start-process
"parallel" nil
168 (list* "ssh" ssh-args
))
171 (put task
'proc proc
)
172 (set-process-sentinel (get task
'proc
) #'parallel--sentinel
)
174 (run-at-time timeout nil
(lambda ()
175 (when (memq (parallel-status task
)
177 (parallel-stop task
)))))
180 (defun parallel--new-task ()
181 "Generate a new task by enforcing a unique name."
182 (let ((symbol-name (make-temp-name "parallel-task-")))
183 (while (intern-soft symbol-name
)
184 (setq symbol-name
(make-temp-name "parallel-task-")))
185 (intern symbol-name
)))
187 (defun parallel--init-server ()
188 "Initialize `parallel--server'."
189 (when (or (null parallel--server
)
190 (not (eq (process-status parallel--server
)
192 (setq parallel--server
193 (make-network-process :name
"parallel-server"
199 :filter
#'parallel--filter
200 :filter-multibyte t
))))
202 (defun parallel--get-task-process (proc)
203 "Return the task running the given PROC."
204 (find-if (lambda (task)
205 (eq (get task
'proc
) proc
))
208 (defun parallel--sentinel (proc _event
)
209 "Sentinel to watch over the remote process.
211 This function do the necessary cleanup when the remote process is
213 (when (memq (process-status proc
) '(exit signal
))
214 (let* ((task (parallel--get-task-process proc
))
215 (results (get task
'results
))
216 (status (process-status proc
)))
217 ;; 0 means that the remote process has terminated normally (no
219 (if (zerop (process-exit-status proc
))
220 (setq status
'success
)
221 ;; on failure, push the exit-code or signal number on the
223 (push (process-exit-status proc
) results
))
224 (put task
'results results
)
225 (put task
'status status
)
227 (when (functionp (get task
'post-exec
))
228 (funcall (get task
'post-exec
)
230 (setq parallel--tasks
(delq task parallel--tasks
)))))
232 (defun parallel--call-with-env (fun env
)
233 "Return a string which can be READ/EVAL by the remote process
234 to `funcall' FUN with ENV as arguments."
235 (format "(funcall (read %S) %s)"
236 (prin1-to-string fun
)
237 (mapconcat (lambda (obj)
238 ;; We need to quote it because the remote
239 ;; process will READ/EVAL it.
240 (format "'%S" obj
)) env
" ")))
242 (defun parallel--filter (connection output
)
243 "Server filter used to retrieve the results send by the remote
244 process and send the code to be executed by it."
245 (dolist (data (parallel--read-output output
))
246 (parallel--process-output connection
(first data
) (rest data
))))
248 (defun parallel--process-output (connection task result
)
249 (put task
'connection connection
)
250 (cond ((and (not (get task
'initialized
))
252 (apply #'parallel-send
257 (while (setq code
(pop (get task
'queue
)))
258 (apply #'parallel-send task
(car code
) (cdr code
))))
259 (put task
'initialized t
))
261 (push result
(get task
'results
))
262 (if (functionp (get task
'on-event
))
263 (funcall (get task
'on-event
) result
)))))
265 (defun parallel-ready-p (task)
266 "Determine whether TASK is finished and if the results are
268 (memq (parallel-status task
) '(success exit signal
)))
270 (defun parallel-get-result (task)
271 "Return the last result send by the remote call, that is the
272 result returned by exec-fun."
273 (first (parallel-get-results task
)))
275 (defun parallel-get-results (task)
276 "Return all results send during the call of exec-fun."
280 (defun parallel-success-p (task)
281 "Determine whether TASK has ended successfully."
283 (eq (parallel-status task
) 'success
))
285 (defun parallel-status (task)
286 "Return TASK status."
289 (defun parallel-wait (task)
291 (while (not (parallel-ready-p task
))
292 (sleep-for parallel-sleep
))
295 (defun parallel-stop (task)
297 (delete-process (get task
'proc
)))
299 (defun parallel-send (task fun
&rest env
)
300 "Send FUN to be evaluated by TASK in ENV."
301 (let ((connection (get task
'connection
)))
305 (parallel--call-with-env fun env
))
306 (push (cons fun env
) (get task
'queue
)))))
310 ;;; parallel.el ends here