1 # ##### BEGIN GPL LICENSE BLOCK #####
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software Foundation,
15 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 # ##### END GPL LICENSE BLOCK #####
19 import sys
, os
, platform
, shutil
20 import http
, http
.client
, http
.server
21 import subprocess
, time
, threading
26 from netrender
.utils
import *
27 import netrender
.model
28 import netrender
.repath
29 import netrender
.baking
30 import netrender
.thumbnail
as thumbnail
41 def slave_Info(netsettings
):
42 sysname
, nodename
, release
, version
, machine
, processor
= platform
.uname()
43 slave
= netrender
.model
.RenderSlave()
45 slave
.stats
= sysname
+ " " + release
+ " " + machine
+ " " + processor
46 if netsettings
.slave_tags
:
47 slave
.tags
= set(netsettings
.slave_tags
.split(";"))
49 if netsettings
.slave_bake
:
50 slave
.tags
.add(netrender
.model
.TAG_BAKING
)
52 if netsettings
.slave_render
:
53 slave
.tags
.add(netrender
.model
.TAG_RENDER
)
57 def testCancel(conn
, job_id
, frame_number
):
58 with
ConnectionContext():
59 conn
.request("HEAD", "/status", headers
={"job-id":job_id
, "job-frame": str(frame_number
)})
61 # canceled if job isn't found anymore
62 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
67 def testFile(conn
, job_id
, slave_id
, rfile
, job_prefix
, main_path
=None):
68 job_full_path
= createLocalPath(rfile
, job_prefix
, main_path
, rfile
.force
)
70 found
= os
.path
.exists(job_full_path
)
72 if found
and rfile
.signature
is not None:
73 found_signature
= hashFile(job_full_path
)
74 found
= found_signature
== rfile
.signature
77 print("Found file %s at %s but signature mismatch!" % (rfile
.filepath
, job_full_path
))
78 os
.remove(job_full_path
)
81 # Force prefix path if not found
82 job_full_path
= createLocalPath(rfile
, job_prefix
, main_path
, True)
83 print("Downloading", job_full_path
)
84 temp_path
= os
.path
.join(job_prefix
, "slave.temp")
85 with
ConnectionContext():
86 conn
.request("GET", fileURL(job_id
, rfile
.index
), headers
={"slave-id":slave_id
})
87 response
= conn
.getresponse()
89 if response
.status
!= http
.client
.OK
:
90 return None # file for job not returned by server, need to return an error code to server
92 f
= open(temp_path
, "wb")
93 buf
= response
.read(1024)
97 buf
= response
.read(1024)
101 os
.renames(temp_path
, job_full_path
)
103 rfile
.filepath
= job_full_path
107 def breakable_timeout(timeout
):
108 for i
in range(timeout
):
110 if engine
.test_break():
113 def render_slave(engine
, netsettings
, threads
):
114 bisleep
= BreakableIncrementedSleep(INCREMENT_TIMEOUT
, 1, MAX_TIMEOUT
, engine
.test_break
)
116 engine
.update_stats("", "Network render node initiation")
118 slave_path
= bpy
.path
.abspath(netsettings
.path
)
120 if not os
.path
.exists(slave_path
):
121 print("Slave working path ( %s ) doesn't exist" % netsettings
.path
)
124 if not os
.access(slave_path
, os
.W_OK
):
125 print("Slave working path ( %s ) is not writable" % netsettings
.path
)
128 conn
= clientConnection(netsettings
)
131 print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY
)
134 for i
in range(MAX_CONNECT_TRY
):
137 conn
= clientConnection(netsettings
)
139 if conn
or engine
.test_break():
142 print("Retry %i failed, waiting %is before retrying" % (i
+ 1, bisleep
.current
))
145 with
ConnectionContext():
146 conn
.request("POST", "/slave", json
.dumps(slave_Info(netsettings
).serialize()))
147 response
= conn
.getresponse()
150 slave_id
= response
.getheader("slave-id")
152 NODE_PREFIX
= os
.path
.join(slave_path
, "slave_" + slave_id
)
153 verifyCreateDir(NODE_PREFIX
)
155 engine
.update_stats("", "Network render connected to master, waiting for jobs")
157 while not engine
.test_break():
158 with
ConnectionContext():
159 conn
.request("GET", "/job", headers
={"slave-id":slave_id
})
160 response
= conn
.getresponse()
162 if response
.status
== http
.client
.OK
:
165 job
= netrender
.model
.RenderJob
.materialize(json
.loads(str(response
.read(), encoding
='utf8')))
166 engine
.update_stats("", "Network render processing job from master")
168 job_prefix
= os
.path
.join(NODE_PREFIX
, "job_" + job
.id)
169 verifyCreateDir(job_prefix
)
171 # set tempdir for fsaa temp files
172 # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting
173 os
.environ
["TMP"] = job_prefix
176 if job
.type == netrender
.model
.JOB_BLENDER
:
177 job_path
= job
.files
[0].original_path
# original path of the first file
178 main_path
, main_file
= os
.path
.split(job_path
)
180 job_full_path
= testFile(conn
, job
.id, slave_id
, job
.files
[0], job_prefix
)
181 print("Fullpath", job_full_path
)
182 print("File:", main_file
, "and %i other files" % (len(job
.files
) - 1,))
184 for rfile
in job
.files
[1:]:
185 testFile(conn
, job
.id, slave_id
, rfile
, job_prefix
, main_path
)
186 print("\t", rfile
.filepath
)
188 netrender
.repath
.update(job
)
190 engine
.update_stats("", "Render File " + main_file
+ " for job " + job
.id)
191 elif job
.type == netrender
.model
.JOB_VCS
:
192 if not job
.version_info
:
193 # Need to return an error to server, incorrect job type
196 job_path
= job
.files
[0].filepath
# path of main file
197 main_path
, main_file
= os
.path
.split(job_path
)
199 job
.version_info
.update()
201 # For VCS jobs, file path is relative to the working copy path
202 job_full_path
= os
.path
.join(job
.version_info
.wpath
, job_path
)
204 engine
.update_stats("", "Render File " + main_file
+ " for job " + job
.id)
206 # announce log to master
207 logfile
= netrender
.model
.LogFile(job
.id, slave_id
, [frame
.number
for frame
in job
.frames
])
208 with
ConnectionContext():
209 conn
.request("POST", "/log", bytes(json
.dumps(logfile
.serialize()), encoding
='utf8'))
210 response
= conn
.getresponse()
214 first_frame
= job
.frames
[0].number
217 start_t
= time
.time()
219 if job
.rendersWithBlender():
222 for frame
in job
.frames
:
223 print("frame", frame
.number
)
224 frame_args
+= ["-f", str(frame
.number
)]
226 with
NoErrorDialogContext():
227 process
= subprocess
.Popen(
228 [bpy
.app
.binary_path
,
234 "-o", os
.path
.join(job_prefix
, "######"),
238 stdout
=subprocess
.PIPE
,
239 stderr
=subprocess
.STDOUT
,
242 elif job
.subtype
== netrender
.model
.JOB_SUB_BAKING
:
244 for frame
in job
.frames
:
245 tasks
.append(netrender
.baking
.commandToTask(frame
.command
))
247 with
NoErrorDialogContext():
248 process
= netrender
.baking
.bake(job
, tasks
)
250 elif job
.type == netrender
.model
.JOB_PROCESS
:
251 command
= job
.frames
[0].command
252 with
NoErrorDialogContext():
253 process
= subprocess
.Popen(command
.split(" "), stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
)
255 headers
= {"slave-id":slave_id
}
263 self
.lock
= threading
.Lock()
264 self
.stdout
= bytes()
265 self
.cancelled
= False
266 self
.start_time
= time
.time()
267 self
.last_time
= time
.time()
271 def run_process(process
, data
):
272 while not data
.cancelled
and process
.poll() is None:
273 buf
= process
.stdout
.read(1024)
279 process_thread
= threading
.Thread(target
=run_process
, args
=(process
, data
))
281 process_thread
.start()
283 while not data
.cancelled
and process_thread
.is_alive():
284 time
.sleep(CANCEL_POLL_SPEED
/ 2)
285 current_time
= time
.time()
286 data
.cancelled
= engine
.test_break()
287 if current_time
- data
.last_time
> CANCEL_POLL_SPEED
:
291 # update logs if needed
293 # (only need to update on one frame, they are linked
294 with
ConnectionContext():
295 conn
.request("PUT", logURL(job
.id, first_frame
), data
.stdout
, headers
=headers
)
298 stdout_text
= str(data
.stdout
, encoding
='utf8')
300 # Also output on console
301 if netsettings
.use_slave_output_log
:
302 print(stdout_text
, end
="")
304 lines
= stdout_text
.split("\n")
305 lines
[0] = line
+ lines
[0]
307 if job
.subtype
== netrender
.model
.JOB_SUB_BAKING
:
308 results
.extend(netrender
.baking
.resultsFromOuput(lines
))
310 data
.stdout
= bytes()
314 data
.last_time
= current_time
315 if testCancel(conn
, job
.id, first_frame
):
316 engine
.update_stats("", "Job canceled by Master")
317 data
.cancelled
= True
319 process_thread
.join()
322 if job
.type == netrender
.model
.JOB_BLENDER
:
323 netrender
.repath
.reset(job
)
326 # kill process if needed
327 if process
.poll() is None:
332 continue # to next frame
334 # read leftovers if needed
335 data
.stdout
+= process
.stdout
.read()
337 # flush the rest of the logs
339 stdout_text
= str(data
.stdout
, encoding
='utf8')
341 # Also output on console
342 if netsettings
.use_slave_output_log
:
343 print(stdout_text
, end
="")
345 lines
= stdout_text
.split("\n")
346 lines
[0] = line
+ lines
[0]
347 if job
.subtype
== netrender
.model
.JOB_SUB_BAKING
:
348 results
.extend(netrender
.baking
.resultsFromOuput(lines
))
350 # (only need to update on one frame, they are linked
351 with
ConnectionContext():
352 conn
.request("PUT", logURL(job
.id, first_frame
), data
.stdout
, headers
=headers
)
354 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
357 total_t
= time
.time() - data
.start_time
359 avg_t
= total_t
/ len(job
.frames
)
361 status
= process
.returncode
363 print("status", status
)
365 headers
= {"job-id":job
.id, "slave-id":slave_id
, "job-time":str(avg_t
)}
368 if status
== 0: # non zero status is error
369 headers
["job-result"] = str(netrender
.model
.FRAME_DONE
)
370 for frame
in job
.frames
:
371 headers
["job-frame"] = str(frame
.number
)
372 if job
.hasRenderResult():
373 # send image back to server
375 filename
= os
.path
.join(job_prefix
, "%06d.exr" % frame
.number
)
378 if netsettings
.use_slave_thumb
:
379 thumbname
= thumbnail
.generate(filename
)
382 f
= open(thumbname
, 'rb')
383 with
ConnectionContext():
384 conn
.request("PUT", "/thumb", f
, headers
=headers
)
388 f
= open(filename
, 'rb')
389 with
ConnectionContext():
390 conn
.request("PUT", "/render", f
, headers
=headers
)
392 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
395 elif job
.subtype
== netrender
.model
.JOB_SUB_BAKING
:
396 index
= job
.frames
.index(frame
)
398 frame_results
= [result_filepath
for task_index
, result_filepath
in results
if task_index
== index
]
400 for result_filepath
in frame_results
:
401 result_path
, result_filename
= os
.path
.split(result_filepath
)
402 headers
["result-filename"] = result_filename
403 headers
["job-finished"] = str(result_filepath
== frame_results
[-1])
405 f
= open(result_filepath
, 'rb')
406 with
ConnectionContext():
407 conn
.request("PUT", "/result", f
, headers
=headers
)
409 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
412 elif job
.type == netrender
.model
.JOB_PROCESS
:
413 with
ConnectionContext():
414 conn
.request("PUT", "/render", headers
=headers
)
415 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
418 headers
["job-result"] = str(netrender
.model
.FRAME_ERROR
)
419 for frame
in job
.frames
:
420 headers
["job-frame"] = str(frame
.number
)
421 # send error result back to server
422 with
ConnectionContext():
423 conn
.request("PUT", "/render", headers
=headers
)
424 if responseStatus(conn
) == http
.client
.NO_CONTENT
:
427 engine
.update_stats("", "Network render connected to master, waiting for jobs")
433 if netsettings
.use_slave_clear
:
434 clearSlave(NODE_PREFIX
)
436 if __name__
== "__main__":