Merge branch 'master' into xwidget
[emacs.git] / lisp / emacs-parallel / parallel.el
bloba6c77eac26b80472f4972c1017c57e1078eca56d
1 ;; -*- lexical-binding: t; -*-
2 ;;; parallel.el ---
4 ;; Copyright (C) 2013 Grégoire Jadi
6 ;; Author: Grégoire Jadi <gregoire.jadi@gmail.com>
8 ;; This program is free software: you can redistribute it and/or
9 ;; modify it under the terms of the GNU General Public License as
10 ;; published by the Free Software Foundation, either version 3 of
11 ;; the License, or (at your option) any later version.
13 ;; This program is distributed in the hope that it will be useful,
14 ;; but WITHOUT ANY WARRANTY; without even the implied warranty of
15 ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 ;; GNU General Public License for more details.
18 ;; You should have received a copy of the GNU General Public License
19 ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
21 ;;; Commentary:
23 ;;; Code:
25 (require 'cl)
26 (require 'parallel-remote)
28 (defgroup parallel nil
29 "Execute stuff in parallel"
30 :group 'emacs)
32 (defcustom parallel-sleep 0.05
33 "How many sec should we wait while polling."
34 :type 'number
35 :group 'parallel)
37 (defcustom parallel-config nil
38 "Global config setting to use."
39 :type 'plist
40 :group 'parallel)
42 (defvar parallel--server nil)
43 (defvar parallel--tasks nil)
44 (defvar parallel--tunnels nil)
46 ;; Declare external function
47 (declare-function parallel-send "parallel-remote")
49 (defun parallel-make-tunnel (username hostname)
50 (parallel--init-server)
51 (let ((tunnel (find-if (lambda (tun)
52 (and (string= username
53 (process-get tun 'username))
54 (string= hostname
55 (process-get tun 'hostname))))
56 parallel--tunnels)))
57 (unless tunnel
58 (setq tunnel (start-process "parallel-ssh" nil "ssh"
59 "-N" "-R" (format "0:localhost:%s"
60 (process-contact parallel--server :service))
61 (format "%s@%s" username hostname)))
62 (process-put tunnel 'username username)
63 (process-put tunnel 'hostname hostname)
64 (set-process-filter tunnel #'parallel--tunnel-filter)
65 (while (null (process-get tunnel 'service))
66 (sleep-for 0.01))
67 (push tunnel parallel--tunnels))
68 tunnel))
70 (defun parallel-stop-tunnel (tunnel)
71 (setq parallel--tunnels (delq tunnel parallel--tunnels))
72 (delete-process tunnel))
74 (defun parallel--tunnel-filter (proc output)
75 (if (string-match "\\([0-9]+\\)" output)
76 (process-put proc 'service (match-string 1 output))))
78 (defmacro parallel--set-option (place config)
79 `(setf ,place (or ,place
80 (plist-get ,config ,(intern (format ":%s" (symbol-name place))))
81 (plist-get parallel-config ,(intern (format ":%s" (symbol-name place)))))))
83 (defmacro parallel--set-options (config &rest options)
84 `(progn
85 ,@(loop for option in options
86 collect `(parallel--set-option ,option ,config))))
88 (defun* parallel-start (exec-fun &key post-exec env timeout
89 emacs-path library-path emacs-args
90 graphical debug on-event continue-when-executed
91 username hostname hostport
92 config)
93 (parallel--init-server)
95 ;; Initialize parameters
96 (parallel--set-options config
97 post-exec
98 env
99 timeout
100 emacs-args
101 graphical
102 debug
103 on-event
104 continue-when-executed
105 username
106 hostname
107 hostport)
109 (setq emacs-path (or emacs-path
110 (plist-get config :emacs-path)
111 (plist-get parallel-config :emacs-path)
112 (expand-file-name invocation-name
113 invocation-directory))
114 library-path (or library-path
115 (plist-get config :library-path)
116 (plist-get parallel-config :library-path)
117 (locate-library "parallel-remote")))
119 (let ((task (parallel--new-task))
120 proc tunnel ssh-args)
121 (push task parallel--tasks)
122 (put task 'initialized nil)
123 (put task 'exec-fun exec-fun)
124 (put task 'env env)
125 (when (functionp post-exec)
126 (put task 'post-exec post-exec))
127 (when (functionp on-event)
128 (put task 'on-event on-event))
129 (put task 'results nil)
130 (put task 'status 'run)
131 (put task 'queue nil)
133 ;; We need to get the tunnel if it exists so we can send the right
134 ;; `service' to the remote.
135 (when (and username hostname)
136 (if hostport
137 (setq ssh-args (list "-R" (format "%s:localhost:%s" hostport
138 (process-contact parallel--server :service)))
139 tunnel t)
140 (setq tunnel (parallel-make-tunnel username hostname)
141 hostport (process-get tunnel 'service)))
142 (setq ssh-args (append
143 ssh-args
144 (if graphical (list "-X"))
145 (list (format "%s@%s" username hostname)))))
146 (setq emacs-args (remq nil
147 (list* "-Q" "-l" library-path
148 (if graphical nil "-batch")
149 "--eval" (format "(setq parallel-service '%S)"
150 (if tunnel
151 hostport
152 (process-contact parallel--server :service)))
153 "--eval" (format "(setq parallel-task-id '%S)" task)
154 "--eval" (format "(setq debug-on-error '%S)" debug)
155 "--eval" (format "(setq parallel-continue-when-executed '%S)" continue-when-executed)
156 "-f" "parallel-remote--init"
157 emacs-args)))
159 ;; Reformat emacs-args if we use a tunnel (escape string)
160 (when tunnel
161 (setq emacs-args (list (mapconcat (lambda (string)
162 (if (find ?' string)
163 (prin1-to-string string)
164 string))
165 emacs-args " "))))
166 (setq proc (apply #'start-process "parallel" nil
167 `(,@(when tunnel
168 (list* "ssh" ssh-args))
169 ,emacs-path
170 ,@emacs-args)))
171 (put task 'proc proc)
172 (set-process-sentinel (get task 'proc) #'parallel--sentinel)
173 (when timeout
174 (run-at-time timeout nil (lambda ()
175 (when (memq (parallel-status task)
176 '(run stop))
177 (parallel-stop task)))))
178 task))
180 (defun parallel--new-task ()
181 "Generate a new task by enforcing a unique name."
182 (let ((symbol-name (make-temp-name "parallel-task-")))
183 (while (intern-soft symbol-name)
184 (setq symbol-name (make-temp-name "parallel-task-")))
185 (intern symbol-name)))
187 (defun parallel--init-server ()
188 "Initialize `parallel--server'."
189 (when (or (null parallel--server)
190 (not (eq (process-status parallel--server)
191 'listen)))
192 (setq parallel--server
193 (make-network-process :name "parallel-server"
194 :buffer nil
195 :server t
196 :host "localhost"
197 :service t
198 :family 'ipv4
199 :filter #'parallel--filter
200 :filter-multibyte t))))
202 (defun parallel--get-task-process (proc)
203 "Return the task running the given PROC."
204 (find-if (lambda (task)
205 (eq (get task 'proc) proc))
206 parallel--tasks))
208 (defun parallel--sentinel (proc _event)
209 "Sentinel to watch over the remote process.
211 This function do the necessary cleanup when the remote process is
212 finished."
213 (when (memq (process-status proc) '(exit signal))
214 (let* ((task (parallel--get-task-process proc))
215 (results (get task 'results))
216 (status (process-status proc)))
217 ;; 0 means that the remote process has terminated normally (no
218 ;; SIGNUM 0).
219 (if (zerop (process-exit-status proc))
220 (setq status 'success)
221 ;; on failure, push the exit-code or signal number on the
222 ;; results stack.
223 (push (process-exit-status proc) results))
224 (put task 'results results)
225 (put task 'status status)
227 (when (functionp (get task 'post-exec))
228 (funcall (get task 'post-exec)
229 results status))
230 (setq parallel--tasks (delq task parallel--tasks)))))
232 (defun parallel--call-with-env (fun env)
233 "Return a string which can be READ/EVAL by the remote process
234 to `funcall' FUN with ENV as arguments."
235 (format "(funcall (read %S) %s)"
236 (prin1-to-string fun)
237 (mapconcat (lambda (obj)
238 ;; We need to quote it because the remote
239 ;; process will READ/EVAL it.
240 (format "'%S" obj)) env " ")))
242 (defun parallel--filter (connection output)
243 "Server filter used to retrieve the results send by the remote
244 process and send the code to be executed by it."
245 (dolist (data (parallel--read-output output))
246 (parallel--process-output connection (first data) (rest data))))
248 (defun parallel--process-output (connection task result)
249 (put task 'connection connection)
250 (cond ((and (not (get task 'initialized))
251 (eq result 'code))
252 (apply #'parallel-send
253 task
254 (get task 'exec-fun)
255 (get task 'env))
256 (let ((code nil))
257 (while (setq code (pop (get task 'queue)))
258 (apply #'parallel-send task (car code) (cdr code))))
259 (put task 'initialized t))
261 (push result (get task 'results))
262 (if (functionp (get task 'on-event))
263 (funcall (get task 'on-event) result)))))
265 (defun parallel-ready-p (task)
266 "Determine whether TASK is finished and if the results are
267 available."
268 (memq (parallel-status task) '(success exit signal)))
270 (defun parallel-get-result (task)
271 "Return the last result send by the remote call, that is the
272 result returned by exec-fun."
273 (first (parallel-get-results task)))
275 (defun parallel-get-results (task)
276 "Return all results send during the call of exec-fun."
277 (parallel-wait task)
278 (get task 'results))
280 (defun parallel-success-p (task)
281 "Determine whether TASK has ended successfully."
282 (parallel-wait task)
283 (eq (parallel-status task) 'success))
285 (defun parallel-status (task)
286 "Return TASK status."
287 (get task 'status))
289 (defun parallel-wait (task)
290 "Wait for TASK."
291 (while (not (parallel-ready-p task))
292 (sleep-for parallel-sleep))
293 t) ; for REPL
295 (defun parallel-stop (task)
296 "Stop TASK."
297 (delete-process (get task 'proc)))
299 (defun parallel-send (task fun &rest env)
300 "Send FUN to be evaluated by TASK in ENV."
301 (let ((connection (get task 'connection)))
302 (if connection
303 (process-send-string
304 connection
305 (parallel--call-with-env fun env))
306 (push (cons fun env) (get task 'queue)))))
308 (provide 'parallel)
310 ;;; parallel.el ends here