Extensions: change the constant for the complete status
[blender-addons-contrib.git] / netrender / slave.py
blob7788e471ed283ba41786c929d5a7c32d3474663d
1 # ##### BEGIN GPL LICENSE BLOCK #####
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software Foundation,
15 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 # ##### END GPL LICENSE BLOCK #####
19 import sys, os, platform, shutil
20 import http, http.client, http.server
21 import subprocess, time, threading
22 import json
24 import bpy
26 from netrender.utils import *
27 import netrender.model
28 import netrender.repath
29 import netrender.baking
30 import netrender.thumbnail as thumbnail
33 CANCEL_POLL_SPEED = 2
34 MAX_TIMEOUT = 10
35 INCREMENT_TIMEOUT = 1
36 MAX_CONNECT_TRY = 10
38 def clearSlave(path):
39 shutil.rmtree(path)
41 def slave_Info(netsettings):
42 sysname, nodename, release, version, machine, processor = platform.uname()
43 slave = netrender.model.RenderSlave()
44 slave.name = nodename
45 slave.stats = sysname + " " + release + " " + machine + " " + processor
46 if netsettings.slave_tags:
47 slave.tags = set(netsettings.slave_tags.split(";"))
49 if netsettings.slave_bake:
50 slave.tags.add(netrender.model.TAG_BAKING)
52 if netsettings.slave_render:
53 slave.tags.add(netrender.model.TAG_RENDER)
55 return slave
57 def testCancel(conn, job_id, frame_number):
58 with ConnectionContext():
59 conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})
61 # canceled if job isn't found anymore
62 if responseStatus(conn) == http.client.NO_CONTENT:
63 return True
64 else:
65 return False
67 def testFile(conn, job_id, slave_id, rfile, job_prefix, main_path=None):
68 job_full_path = createLocalPath(rfile, job_prefix, main_path, rfile.force)
70 found = os.path.exists(job_full_path)
72 if found and rfile.signature is not None:
73 found_signature = hashFile(job_full_path)
74 found = found_signature == rfile.signature
76 if not found:
77 print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
78 os.remove(job_full_path)
80 if not found:
81 # Force prefix path if not found
82 job_full_path = createLocalPath(rfile, job_prefix, main_path, True)
83 print("Downloading", job_full_path)
84 temp_path = os.path.join(job_prefix, "slave.temp")
85 with ConnectionContext():
86 conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id})
87 response = conn.getresponse()
89 if response.status != http.client.OK:
90 return None # file for job not returned by server, need to return an error code to server
92 f = open(temp_path, "wb")
93 buf = response.read(1024)
95 while buf:
96 f.write(buf)
97 buf = response.read(1024)
99 f.close()
101 os.renames(temp_path, job_full_path)
103 rfile.filepath = job_full_path
105 return job_full_path
107 def breakable_timeout(timeout):
108 for i in range(timeout):
109 time.sleep(1)
110 if engine.test_break():
111 break
113 def render_slave(engine, netsettings, threads):
114 bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break)
116 engine.update_stats("", "Network render node initiation")
118 slave_path = bpy.path.abspath(netsettings.path)
120 if not os.path.exists(slave_path):
121 print("Slave working path ( %s ) doesn't exist" % netsettings.path)
122 return
124 if not os.access(slave_path, os.W_OK):
125 print("Slave working path ( %s ) is not writable" % netsettings.path)
126 return
128 conn = clientConnection(netsettings)
130 if not conn:
131 print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY)
132 bisleep.reset()
134 for i in range(MAX_CONNECT_TRY):
135 bisleep.sleep()
137 conn = clientConnection(netsettings)
139 if conn or engine.test_break():
140 break
142 print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current))
144 if conn:
145 with ConnectionContext():
146 conn.request("POST", "/slave", json.dumps(slave_Info(netsettings).serialize()))
147 response = conn.getresponse()
148 response.read()
150 slave_id = response.getheader("slave-id")
152 NODE_PREFIX = os.path.join(slave_path, "slave_" + slave_id)
153 verifyCreateDir(NODE_PREFIX)
155 engine.update_stats("", "Network render connected to master, waiting for jobs")
157 while not engine.test_break():
158 with ConnectionContext():
159 conn.request("GET", "/job", headers={"slave-id":slave_id})
160 response = conn.getresponse()
162 if response.status == http.client.OK:
163 bisleep.reset()
165 job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8')))
166 engine.update_stats("", "Network render processing job from master")
168 job_prefix = os.path.join(NODE_PREFIX, "job_" + job.id)
169 verifyCreateDir(job_prefix)
171 # set tempdir for fsaa temp files
172 # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting
173 os.environ["TMP"] = job_prefix
176 if job.type == netrender.model.JOB_BLENDER:
177 job_path = job.files[0].original_path # original path of the first file
178 main_path, main_file = os.path.split(job_path)
180 job_full_path = testFile(conn, job.id, slave_id, job.files[0], job_prefix)
181 print("Fullpath", job_full_path)
182 print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
184 for rfile in job.files[1:]:
185 testFile(conn, job.id, slave_id, rfile, job_prefix, main_path)
186 print("\t", rfile.filepath)
188 netrender.repath.update(job)
190 engine.update_stats("", "Render File " + main_file + " for job " + job.id)
191 elif job.type == netrender.model.JOB_VCS:
192 if not job.version_info:
193 # Need to return an error to server, incorrect job type
194 pass
196 job_path = job.files[0].filepath # path of main file
197 main_path, main_file = os.path.split(job_path)
199 job.version_info.update()
201 # For VCS jobs, file path is relative to the working copy path
202 job_full_path = os.path.join(job.version_info.wpath, job_path)
204 engine.update_stats("", "Render File " + main_file + " for job " + job.id)
206 # announce log to master
207 logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames])
208 with ConnectionContext():
209 conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8'))
210 response = conn.getresponse()
211 response.read()
214 first_frame = job.frames[0].number
216 # start render
217 start_t = time.time()
219 if job.rendersWithBlender():
220 frame_args = []
222 for frame in job.frames:
223 print("frame", frame.number)
224 frame_args += ["-f", str(frame.number)]
226 with NoErrorDialogContext():
227 process = subprocess.Popen(
228 [bpy.app.binary_path,
229 "-b",
230 "-y",
231 "-noaudio",
232 job_full_path,
233 "-t", str(threads),
234 "-o", os.path.join(job_prefix, "######"),
235 "-E", job.render,
236 "-F", "MULTILAYER",
237 ] + frame_args,
238 stdout=subprocess.PIPE,
239 stderr=subprocess.STDOUT,
242 elif job.subtype == netrender.model.JOB_SUB_BAKING:
243 tasks = []
244 for frame in job.frames:
245 tasks.append(netrender.baking.commandToTask(frame.command))
247 with NoErrorDialogContext():
248 process = netrender.baking.bake(job, tasks)
250 elif job.type == netrender.model.JOB_PROCESS:
251 command = job.frames[0].command
252 with NoErrorDialogContext():
253 process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
255 headers = {"slave-id":slave_id}
257 results = []
259 line = ""
261 class ProcessData:
262 def __init__(self):
263 self.lock = threading.Lock()
264 self.stdout = bytes()
265 self.cancelled = False
266 self.start_time = time.time()
267 self.last_time = time.time()
269 data = ProcessData()
271 def run_process(process, data):
272 while not data.cancelled and process.poll() is None:
273 buf = process.stdout.read(1024)
275 data.lock.acquire()
276 data.stdout += buf
277 data.lock.release()
279 process_thread = threading.Thread(target=run_process, args=(process, data))
281 process_thread.start()
283 while not data.cancelled and process_thread.is_alive():
284 time.sleep(CANCEL_POLL_SPEED / 2)
285 current_time = time.time()
286 data.cancelled = engine.test_break()
287 if current_time - data.last_time > CANCEL_POLL_SPEED:
289 data.lock.acquire()
291 # update logs if needed
292 if data.stdout:
293 # (only need to update on one frame, they are linked
294 with ConnectionContext():
295 conn.request("PUT", logURL(job.id, first_frame), data.stdout, headers=headers)
296 responseStatus(conn)
298 stdout_text = str(data.stdout, encoding='utf8')
300 # Also output on console
301 if netsettings.use_slave_output_log:
302 print(stdout_text, end="")
304 lines = stdout_text.split("\n")
305 lines[0] = line + lines[0]
306 line = lines.pop()
307 if job.subtype == netrender.model.JOB_SUB_BAKING:
308 results.extend(netrender.baking.resultsFromOuput(lines))
310 data.stdout = bytes()
312 data.lock.release()
314 data.last_time = current_time
315 if testCancel(conn, job.id, first_frame):
316 engine.update_stats("", "Job canceled by Master")
317 data.cancelled = True
319 process_thread.join()
320 del process_thread
322 if job.type == netrender.model.JOB_BLENDER:
323 netrender.repath.reset(job)
325 if data.cancelled:
326 # kill process if needed
327 if process.poll() is None:
328 try:
329 process.terminate()
330 except OSError:
331 pass
332 continue # to next frame
334 # read leftovers if needed
335 data.stdout += process.stdout.read()
337 # flush the rest of the logs
338 if data.stdout:
339 stdout_text = str(data.stdout, encoding='utf8')
341 # Also output on console
342 if netsettings.use_slave_output_log:
343 print(stdout_text, end="")
345 lines = stdout_text.split("\n")
346 lines[0] = line + lines[0]
347 if job.subtype == netrender.model.JOB_SUB_BAKING:
348 results.extend(netrender.baking.resultsFromOuput(lines))
350 # (only need to update on one frame, they are linked
351 with ConnectionContext():
352 conn.request("PUT", logURL(job.id, first_frame), data.stdout, headers=headers)
354 if responseStatus(conn) == http.client.NO_CONTENT:
355 continue
357 total_t = time.time() - data.start_time
359 avg_t = total_t / len(job.frames)
361 status = process.returncode
363 print("status", status)
365 headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
368 if status == 0: # non zero status is error
369 headers["job-result"] = str(netrender.model.FRAME_DONE)
370 for frame in job.frames:
371 headers["job-frame"] = str(frame.number)
372 if job.hasRenderResult():
373 # send image back to server
375 filename = os.path.join(job_prefix, "%06d.exr" % frame.number)
377 # thumbnail first
378 if netsettings.use_slave_thumb:
379 thumbname = thumbnail.generate(filename)
380 if thumbname:
381 sendFile(conn, "/thumb", thumbname, headers=headers)
383 reponse_status = sendFile(conn, "/render", filename, headers=headers)
384 if reponse_status == http.client.NO_CONTENT:
385 continue
387 elif job.subtype == netrender.model.JOB_SUB_BAKING:
388 index = job.frames.index(frame)
390 frame_results = [result_filepath for task_index, result_filepath in results if task_index == index]
392 for result_filepath in frame_results:
393 result_path, result_filename = os.path.split(result_filepath)
394 headers["result-filename"] = result_filename
395 headers["job-finished"] = str(result_filepath == frame_results[-1])
397 response_status = sendFile(conn, "/result", result_filepath, headers=headers)
398 if response_status == http.client.NO_CONTENT:
399 continue
401 elif job.type == netrender.model.JOB_PROCESS:
402 with ConnectionContext():
403 conn.request("PUT", "/render", headers=headers)
404 if responseStatus(conn) == http.client.NO_CONTENT:
405 continue
406 else:
407 headers["job-result"] = str(netrender.model.FRAME_ERROR)
408 for frame in job.frames:
409 headers["job-frame"] = str(frame.number)
410 # send error result back to server
411 with ConnectionContext():
412 conn.request("PUT", "/render", headers=headers)
413 if responseStatus(conn) == http.client.NO_CONTENT:
414 continue
416 engine.update_stats("", "Network render connected to master, waiting for jobs")
417 else:
418 bisleep.sleep()
420 conn.close()
422 if netsettings.use_slave_clear:
423 clearSlave(NODE_PREFIX)
425 if __name__ == "__main__":
426 pass