math_vis: remove use of register_module
[blender-addons.git] / netrender / master.py
blob81e62dbb713c04d88d70c3d7b2b1fa4382477b52
1 # ##### BEGIN GPL LICENSE BLOCK #####
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #s
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software Foundation,
15 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 # ##### END GPL LICENSE BLOCK #####
19 import sys, os
20 import http, http.client, http.server, socket, socketserver
21 import shutil, time, hashlib
22 import pickle
23 import zipfile
24 import select # for select.error
25 import json
28 from netrender.utils import *
29 import netrender.model
30 import netrender.balancing
31 import netrender.master_html
32 import netrender.thumbnail as thumbnail
34 class MRenderFile(netrender.model.RenderFile):
35 def __init__(self, filepath, index, start, end, signature):
36 super().__init__(filepath, index, start, end, signature)
37 self.found = False
39 def updateStatus(self):
40 self.found = os.path.exists(self.filepath)
42 if self.found and self.signature is not None:
43 found_signature = hashFile(self.filepath)
44 self.found = self.signature == found_signature
45 if not self.found:
46 print("Signature mismatch", self.signature, found_signature)
48 return self.found
50 def test(self):
51 # don't check when forcing upload and only until found
52 if not self.force and not self.found:
53 self.updateStatus()
55 return self.found
58 class MRenderSlave(netrender.model.RenderSlave):
59 def __init__(self, slave_info):
60 super().__init__(slave_info)
61 self.id = hashlib.md5(bytes(repr(slave_info.name) + repr(slave_info.address), encoding='utf8')).hexdigest()
62 self.last_seen = time.time()
65 self.job = None
66 self.job_frames = []
68 netrender.model.RenderSlave._slave_map[self.id] = self
70 def seen(self):
71 self.last_seen = time.time()
73 def finishedFrame(self, frame_number):
74 try:
75 self.job_frames.remove(frame_number)
76 except ValueError as e:
77 print("Internal error: Frame %i not in job frames list" % frame_number)
78 print(self.job_frames)
79 if not self.job_frames:
80 self.job = None
82 class MRenderJob(netrender.model.RenderJob):
83 def __init__(self, job_id, job_info):
84 super().__init__(job_info)
85 self.id = job_id
86 self.last_dispatched = time.time()
87 self.start_time = time.time()
88 self.finish_time = self.start_time
89 # force one chunk for process jobs
90 if self.type == netrender.model.JOB_PROCESS:
91 self.chunks = 1
93 # Force WAITING status on creation
94 self.status = netrender.model.JOB_WAITING
96 # special server properties
97 self.last_update = 0
98 self.save_path = ""
99 self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
101 def setForceUpload(self, force):
102 for rfile in self.files:
103 rfile.force = force
105 def initInfo(self):
106 if not self.resolution:
107 self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"]))
109 def save(self):
110 if self.save_path:
111 f = open(os.path.join(self.save_path, "job.txt"), "w")
112 f.write(json.dumps(self.serialize()))
113 f.close()
115 def edit(self, info_map):
116 if "status" in info_map:
117 self.status = info_map["status"]
119 if "priority" in info_map:
120 self.priority = info_map["priority"]
122 if "chunks" in info_map:
123 self.chunks = info_map["chunks"]
125 def testStart(self):
126 # Don't test files for versionned jobs
127 if not self.version_info:
128 for f in self.files:
129 if not f.test():
130 return False
132 self.start()
133 self.initInfo()
134 return True
136 def testFinished(self):
137 for f in self.frames:
138 if f.status in {netrender.model.FRAME_QUEUED, netrender.model.FRAME_DISPATCHED, netrender.model.FRAME_ERROR}:
139 break
140 else:
141 self.status = netrender.model.JOB_FINISHED
142 self.finish_time=time.time()
144 def pause(self, status = None):
145 if self.status not in {netrender.model.JOB_PAUSED, netrender.model.JOB_QUEUED}:
146 return
148 if status is None:
149 self.status = netrender.model.JOB_PAUSED if self.status == netrender.model.JOB_QUEUED else netrender.model.JOB_QUEUED
150 elif status:
151 self.status = netrender.model.JOB_QUEUED
152 else:
153 self.status = netrender.model.JOB_PAUSED
155 def start(self):
156 self.status = netrender.model.JOB_QUEUED
159 def addLog(self, frames):
160 frames = sorted(frames)
161 log_name = "%06d_%06d.log" % (frames[0], frames[-1])
162 log_path = os.path.join(self.save_path, log_name)
164 for number in frames:
165 frame = self[number]
166 if frame:
167 frame.log_path = log_path
169 def addFrame(self, frame_number, command):
170 frame = MRenderFrame(frame_number, command)
171 self.frames.append(frame)
172 return frame
174 def reset(self, all):
175 for f in self.frames:
176 f.reset(all)
178 if all:
179 self.status = netrender.model.JOB_QUEUED
181 def getFrames(self):
182 frames = []
183 for f in self.frames:
184 if f.status == netrender.model.FRAME_QUEUED:
185 self.last_dispatched = time.time()
186 frames.append(f)
187 if len(frames) >= self.chunks:
188 break
190 return frames
192 def getResultPath(self, filename):
193 return os.path.join(self.save_path, filename)
195 class MRenderFrame(netrender.model.RenderFrame):
196 def __init__(self, frame, command):
197 super().__init__()
198 self.number = frame
199 self.slave = None
200 self.time = 0
201 self.status = netrender.model.FRAME_QUEUED
202 self.command = command
204 self.log_path = None
206 def addDefaultRenderResult(self):
207 self.results.append(self.getRenderFilename())
209 def getRenderFilename(self):
210 return "%06d.exr" % self.number
212 def reset(self, all):
213 if all or self.status == netrender.model.FRAME_ERROR:
214 self.log_path = None
215 self.slave = None
216 self.time = 0
217 self.status = netrender.model.FRAME_QUEUED
220 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
221 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
222 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
223 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
224 file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
225 render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr")
226 result_pattern = re.compile("/result_([a-zA-Z0-9]+).zip")
227 thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg")
228 log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
229 reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
230 cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
231 pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
232 edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
234 class RenderHandler(http.server.BaseHTTPRequestHandler):
235 def write_file(self, file_path, mode = 'wb'):
236 length = int(self.headers['content-length'])
237 f = open(file_path, mode)
238 buf = self.rfile.read(length)
239 f.write(buf)
240 f.close()
241 del buf
243 def log_message(self, format, *args):
244 # override because the original calls self.address_string(), which
245 # is extremely slow due to some timeout..
246 sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args))
248 def getInfoMap(self):
249 length = int(self.headers['content-length'])
251 if length > 0:
252 msg = str(self.rfile.read(length), encoding='utf8')
253 return json.loads(msg)
254 else:
255 return {}
257 def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
258 self.send_response(code)
260 if code == http.client.OK and content:
261 self.send_header("Content-type", content)
263 for key, value in headers.items():
264 self.send_header(key, value)
266 self.end_headers()
268 def do_HEAD(self):
270 if self.path == "/status":
271 job_id = self.headers.get('job-id', "")
272 job_frame = int(self.headers.get('job-frame', -1))
274 job = self.server.getJobID(job_id)
275 if job:
276 frame = job[job_frame]
279 if frame:
280 self.send_head(http.client.OK)
281 else:
282 # no such frame
283 self.send_head(http.client.NO_CONTENT)
284 else:
285 # no such job id
286 self.send_head(http.client.NO_CONTENT)
288 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
289 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
290 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
291 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
292 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
294 def do_GET(self):
296 if self.path == "/version":
297 self.send_head()
298 self.server.stats("", "Version check")
299 self.wfile.write(VERSION)
300 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
301 elif self.path.startswith("/render"):
302 match = render_pattern.match(self.path)
304 if match:
305 job_id = match.groups()[0]
306 frame_number = int(match.groups()[1])
308 job = self.server.getJobID(job_id)
310 if job:
311 frame = job[frame_number]
313 if frame:
314 if frame.status in {netrender.model.FRAME_QUEUED, netrender.model.FRAME_DISPATCHED}:
315 self.send_head(http.client.ACCEPTED)
316 elif frame.status == netrender.model.FRAME_DONE:
317 self.server.stats("", "Sending result to client")
319 filename = job.getResultPath(frame.getRenderFilename())
321 f = open(filename, 'rb')
322 self.send_head(content = "image/x-exr")
323 shutil.copyfileobj(f, self.wfile)
324 f.close()
325 elif frame.status == netrender.model.FRAME_ERROR:
326 self.send_head(http.client.PARTIAL_CONTENT)
327 else:
328 # no such frame
329 self.send_head(http.client.NO_CONTENT)
330 else:
331 # no such job id
332 self.send_head(http.client.NO_CONTENT)
333 else:
334 # invalid url
335 self.send_head(http.client.NO_CONTENT)
336 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
337 elif self.path.startswith("/result"):
338 match = result_pattern.match(self.path)
340 if match:
341 job_id = match.groups()[0]
343 job = self.server.getJobID(job_id)
345 if job:
346 self.server.stats("", "Sending result to client")
348 zip_filepath = job.getResultPath("results.zip")
349 with zipfile.ZipFile(zip_filepath, "w") as zfile:
350 for frame in job.frames:
351 if frame.status == netrender.model.FRAME_DONE:
352 for filename in frame.results:
353 filepath = job.getResultPath(filename)
355 zfile.write(filepath, filename)
358 f = open(zip_filepath, 'rb')
359 self.send_head(content = "application/x-zip-compressed")
360 shutil.copyfileobj(f, self.wfile)
361 f.close()
362 else:
363 # no such job id
364 self.send_head(http.client.NO_CONTENT)
365 else:
366 # invalid url
367 self.send_head(http.client.NO_CONTENT)
368 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
369 elif self.path.startswith("/thumb"):
370 match = thumb_pattern.match(self.path)
372 if match:
373 job_id = match.groups()[0]
374 frame_number = int(match.groups()[1])
376 job = self.server.getJobID(job_id)
378 if job:
379 frame = job[frame_number]
381 if frame:
382 if frame.status in {netrender.model.FRAME_QUEUED, netrender.model.FRAME_DISPATCHED}:
383 self.send_head(http.client.ACCEPTED)
384 elif frame.status == netrender.model.FRAME_DONE:
385 filename = job.getResultPath(frame.getRenderFilename())
387 thumbname = thumbnail.generate(filename)
389 if thumbname:
390 f = open(thumbname, 'rb')
391 self.send_head(content = "image/jpeg")
392 shutil.copyfileobj(f, self.wfile)
393 f.close()
394 else: # thumbnail couldn't be generated
395 self.send_head(http.client.PARTIAL_CONTENT)
396 return
397 elif frame.status == netrender.model.FRAME_ERROR:
398 self.send_head(http.client.PARTIAL_CONTENT)
399 else:
400 # no such frame
401 self.send_head(http.client.NO_CONTENT)
402 else:
403 # no such job id
404 self.send_head(http.client.NO_CONTENT)
405 else:
406 # invalid url
407 self.send_head(http.client.NO_CONTENT)
408 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
409 elif self.path.startswith("/log"):
410 match = log_pattern.match(self.path)
412 if match:
413 job_id = match.groups()[0]
414 frame_number = int(match.groups()[1])
416 job = self.server.getJobID(job_id)
418 if job:
419 frame = job[frame_number]
421 if frame:
422 if not frame.log_path or frame.status in {netrender.model.FRAME_QUEUED, netrender.model.FRAME_DISPATCHED}:
423 self.send_head(http.client.PROCESSING)
424 else:
425 self.server.stats("", "Sending log to client")
426 f = open(frame.log_path, 'rb')
428 self.send_head(content = "text/plain")
430 shutil.copyfileobj(f, self.wfile)
432 f.close()
433 else:
434 # no such frame
435 self.send_head(http.client.NO_CONTENT)
436 else:
437 # no such job id
438 self.send_head(http.client.NO_CONTENT)
439 else:
440 # invalid URL
441 self.send_head(http.client.NO_CONTENT)
442 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
443 elif self.path == "/status":
444 job_id = self.headers.get('job-id', "")
445 job_frame = int(self.headers.get('job-frame', -1))
447 if job_id:
449 job = self.server.getJobID(job_id)
450 if job:
451 if job_frame != -1:
452 frame = job[frame]
454 if frame:
455 message = frame.serialize()
456 else:
457 # no such frame
458 self.send_head(http.client.NO_CONTENT)
459 return
460 else:
461 message = job.serialize()
462 else:
463 # no such job id
464 self.send_head(http.client.NO_CONTENT)
465 return
466 else: # status of all jobs
467 message = []
469 for job in self.server:
470 message.append(job.serialize())
473 self.server.stats("", "Sending status")
474 self.send_head()
475 self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
477 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
478 elif self.path == "/job":
479 self.server.balance()
481 slave_id = self.headers['slave-id']
483 slave = self.server.getSeenSlave(slave_id)
485 if slave: # only if slave id is valid
486 job, frames = self.server.newDispatch(slave)
488 if job and frames:
489 for f in frames:
490 print("dispatch", f.number)
491 f.status = netrender.model.FRAME_DISPATCHED
492 f.slave = slave
494 slave.job = job
495 slave.job_frames = [f.number for f in frames]
497 self.send_head(headers={"job-id": job.id})
499 message = job.serialize(frames)
500 self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
502 self.server.stats("", "Sending job to slave")
503 else:
504 # no job available, return error code
505 slave.job = None
506 slave.job_frames = []
508 self.send_head(http.client.ACCEPTED)
509 else: # invalid slave id
510 self.send_head(http.client.NO_CONTENT)
511 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
512 elif self.path.startswith("/file"):
513 match = file_pattern.match(self.path)
515 if match:
516 slave_id = self.headers['slave-id']
517 slave = self.server.getSeenSlave(slave_id)
519 if not slave:
520 # invalid slave id
521 print("invalid slave id")
523 job_id = match.groups()[0]
524 file_index = int(match.groups()[1])
526 job = self.server.getJobID(job_id)
528 if job:
529 render_file = job.files[file_index]
531 if render_file:
532 self.server.stats("", "Sending file to slave")
533 f = open(render_file.filepath, 'rb')
535 self.send_head()
536 shutil.copyfileobj(f, self.wfile)
538 f.close()
539 else:
540 # no such file
541 self.send_head(http.client.NO_CONTENT)
542 else:
543 # no such job id
544 self.send_head(http.client.NO_CONTENT)
545 else: # invalid url
546 self.send_head(http.client.NO_CONTENT)
547 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
548 elif self.path == "/slaves":
549 message = []
551 self.server.stats("", "Sending slaves status")
553 for slave in self.server.slaves:
554 message.append(slave.serialize())
556 self.send_head()
558 self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
559 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
560 else:
561 # hand over the rest to the html section
562 netrender.master_html.get(self)
564 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
565 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
566 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
567 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
568 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
569 def do_POST(self):
571 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
572 if self.path == "/job":
574 length = int(self.headers['content-length'])
576 job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
577 job_id = self.server.nextJobID()
579 job = MRenderJob(job_id, job_info)
581 job.setForceUpload(self.server.force)
583 for frame in job_info.frames:
584 frame = job.addFrame(frame.number, frame.command)
586 self.server.addJob(job)
588 headers={"job-id": job_id}
590 if job.testStart():
591 self.server.stats("", "New job, started")
592 self.send_head(headers=headers, content = None)
593 else:
594 self.server.stats("", "New job, missing files (%i total)" % len(job.files))
595 self.send_head(http.client.ACCEPTED, headers=headers)
596 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
597 elif self.path.startswith("/edit"):
598 match = edit_pattern.match(self.path)
600 if match:
601 job_id = match.groups()[0]
603 job = self.server.getJobID(job_id)
605 if job:
606 info_map = self.getInfoMap()
608 job.edit(info_map)
609 self.send_head(content = None)
610 else:
611 # no such job id
612 self.send_head(http.client.NO_CONTENT)
613 else:
614 # invalid url
615 self.send_head(http.client.NO_CONTENT)
616 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
617 elif self.path == "/balance_limit":
618 info_map = self.getInfoMap()
619 for rule_id, limit in info_map.items():
620 try:
621 rule = self.server.balancer.ruleByID(rule_id)
622 if rule:
623 rule.setLimit(limit)
624 except:
625 pass # invalid type
627 self.send_head(content = None)
628 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
629 elif self.path == "/balance_enable":
630 info_map = self.getInfoMap()
631 for rule_id, enabled in info_map.items():
632 rule = self.server.balancer.ruleByID(rule_id)
633 if rule:
634 rule.enabled = enabled
636 self.send_head(content = None)
637 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
638 elif self.path.startswith("/cancel"):
639 match = cancel_pattern.match(self.path)
641 if match:
642 info_map = self.getInfoMap()
643 clear = info_map.get("clear", False)
645 job_id = match.groups()[0]
647 job = self.server.getJobID(job_id)
649 if job:
650 self.server.stats("", "Cancelling job")
651 self.server.removeJob(job, clear)
652 self.send_head(content = None)
653 else:
654 # no such job id
655 self.send_head(http.client.NO_CONTENT)
656 else:
657 # invalid url
658 self.send_head(http.client.NO_CONTENT)
659 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
660 elif self.path.startswith("/pause"):
661 match = pause_pattern.match(self.path)
663 if match:
664 info_map = self.getInfoMap()
665 status = info_map.get("status", None)
667 job_id = match.groups()[0]
669 job = self.server.getJobID(job_id)
671 if job:
672 self.server.stats("", "Pausing job")
673 job.pause(status)
674 self.send_head(content = None)
675 else:
676 # no such job id
677 self.send_head(http.client.NO_CONTENT)
678 else:
679 # invalid url
680 self.send_head(http.client.NO_CONTENT)
681 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
682 elif self.path == "/clear":
683 # cancel all jobs
684 info_map = self.getInfoMap()
685 clear = info_map.get("clear", False)
687 self.server.stats("", "Clearing jobs")
688 self.server.clear(clear)
690 self.send_head(content = None)
691 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
692 elif self.path.startswith("/reset"):
693 match = reset_pattern.match(self.path)
695 if match:
696 all = match.groups()[0] == 'all'
697 job_id = match.groups()[1]
698 job_frame = int(match.groups()[2])
700 job = self.server.getJobID(job_id)
702 if job:
703 if job_frame != 0:
705 frame = job[job_frame]
706 if frame:
707 self.server.stats("", "Reset job frame")
708 frame.reset(all)
709 self.send_head(content = None)
710 else:
711 # no such frame
712 self.send_head(http.client.NO_CONTENT)
714 else:
715 self.server.stats("", "Reset job")
716 job.reset(all)
717 self.send_head(content = None)
719 else: # job not found
720 self.send_head(http.client.NO_CONTENT)
721 else: # invalid url
722 self.send_head(http.client.NO_CONTENT)
723 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
724 elif self.path == "/slave":
725 length = int(self.headers['content-length'])
726 # job_frame_string = self.headers['job-frame'] # UNUSED
728 self.server.stats("", "New slave connected")
730 slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False)
732 slave_info.address = self.client_address
734 slave_id = self.server.addSlave(slave_info)
736 self.send_head(headers = {"slave-id": slave_id}, content = None)
737 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
738 elif self.path == "/log":
739 length = int(self.headers['content-length'])
741 log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
743 slave_id = log_info.slave_id
745 slave = self.server.getSeenSlave(slave_id)
747 if slave: # only if slave id is valid
748 job = self.server.getJobID(log_info.job_id)
750 if job:
751 self.server.stats("", "Log announcement")
752 job.addLog(log_info.frames)
753 self.send_head(content = None)
754 else:
755 # no such job id
756 self.send_head(http.client.NO_CONTENT)
757 else: # invalid slave id
758 self.send_head(http.client.NO_CONTENT)
759 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
760 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
761 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
762 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
763 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
764 def do_PUT(self):
766 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
767 if self.path.startswith("/file"):
768 match = file_pattern.match(self.path)
770 if match:
771 self.server.stats("", "Receiving job file")
773 job_id = match.groups()[0]
774 file_index = int(match.groups()[1])
776 job = self.server.getJobID(job_id)
778 if job:
780 rfile = job.files[file_index]
782 if rfile:
783 main_file = job.files[0].original_path # original path of the first file
785 main_path, main_name = os.path.split(main_file)
787 if file_index > 0:
788 file_path = createLocalPath(rfile, job.save_path, main_path, True)
789 else:
790 file_path = os.path.join(job.save_path, main_name)
792 # add same temp file + renames as slave
794 self.write_file(file_path)
796 rfile.filepath = file_path # set the new path
797 found = rfile.updateStatus() # make sure we have the right file
799 if not found: # checksum mismatch
800 self.server.stats("", "File upload but checksum mismatch, this shouldn't happen")
801 self.send_head(http.client.CONFLICT)
802 elif job.testStart(): # started correctly
803 self.server.stats("", "File upload, starting job")
804 self.send_head(content = None)
805 else:
806 self.server.stats("", "File upload, dependency files still missing")
807 self.send_head(http.client.ACCEPTED)
808 else: # invalid file
809 print("file not found", job_id, file_index)
810 self.send_head(http.client.NO_CONTENT)
811 else: # job not found
812 print("job not found", job_id, file_index)
813 self.send_head(http.client.NO_CONTENT)
814 else: # invalid url
815 print("no match")
816 self.send_head(http.client.NO_CONTENT)
817 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
818 elif self.path == "/render":
819 self.server.stats("", "Receiving render result")
821 slave_id = self.headers['slave-id']
823 slave = self.server.getSeenSlave(slave_id)
825 if slave: # only if slave id is valid
826 job_id = self.headers['job-id']
828 job = self.server.getJobID(job_id)
830 if job:
831 job_frame = int(self.headers['job-frame'])
832 job_result = int(self.headers['job-result'])
833 job_time = float(self.headers['job-time'])
835 frame = job[job_frame]
837 if frame:
838 self.send_head(content = None)
840 if job.hasRenderResult():
841 if job_result == netrender.model.FRAME_DONE:
842 frame.addDefaultRenderResult()
843 self.write_file(job.getResultPath(frame.getRenderFilename()))
845 elif job_result == netrender.model.FRAME_ERROR:
846 # blacklist slave on this job on error
847 # slaves might already be in blacklist if errors on the whole chunk
848 if not slave.id in job.blacklist:
849 job.blacklist.append(slave.id)
851 slave.finishedFrame(job_frame)
853 frame.status = job_result
854 frame.time = job_time
856 job.testFinished()
858 else: # frame not found
859 self.send_head(http.client.NO_CONTENT)
860 else: # job not found
861 self.send_head(http.client.NO_CONTENT)
862 else: # invalid slave id
863 self.send_head(http.client.NO_CONTENT)
864 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
865 elif self.path == "/result":
866 self.server.stats("", "Receiving job result")
868 slave_id = self.headers['slave-id']
870 slave = self.server.getSeenSlave(slave_id)
872 if slave: # only if slave id is valid
873 job_id = self.headers['job-id']
875 job = self.server.getJobID(job_id)
877 if job:
878 job_frame = int(self.headers['job-frame'])
880 frame = job[job_frame]
882 if frame:
883 job_result = int(self.headers['job-result'])
884 job_finished = self.headers['job-finished'] == str(True)
886 self.send_head(content = None)
888 if job_result == netrender.model.FRAME_DONE:
889 result_filename = self.headers['result-filename']
891 frame.results.append(result_filename)
892 self.write_file(job.getResultPath(result_filename))
894 if job_finished:
895 job_time = float(self.headers['job-time'])
896 slave.finishedFrame(job_frame)
898 frame.status = job_result
899 frame.time = job_time
901 job.testFinished()
902 else: # frame not found
903 self.send_head(http.client.NO_CONTENT)
904 else: # job not found
905 self.send_head(http.client.NO_CONTENT)
906 else: # invalid slave id
907 self.send_head(http.client.NO_CONTENT)
908 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
909 elif self.path == "/thumb":
910 self.server.stats("", "Receiving thumbnail result")
912 slave_id = self.headers['slave-id']
914 slave = self.server.getSeenSlave(slave_id)
916 if slave: # only if slave id is valid
917 job_id = self.headers['job-id']
919 job = self.server.getJobID(job_id)
921 if job:
922 job_frame = int(self.headers['job-frame'])
924 frame = job[job_frame]
926 if frame:
927 self.send_head(content = None)
929 if job.hasRenderResult():
930 self.write_file(os.path.join(os.path.join(job.save_path, "%06d.jpg" % job_frame)))
932 else: # frame not found
933 self.send_head(http.client.NO_CONTENT)
934 else: # job not found
935 self.send_head(http.client.NO_CONTENT)
936 else: # invalid slave id
937 self.send_head(http.client.NO_CONTENT)
938 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
939 elif self.path.startswith("/log"):
940 self.server.stats("", "Receiving log file")
942 match = log_pattern.match(self.path)
944 if match:
945 job_id = match.groups()[0]
947 job = self.server.getJobID(job_id)
949 if job:
950 job_frame = int(match.groups()[1])
952 frame = job[job_frame]
954 if frame and frame.log_path:
955 self.send_head(content = None)
957 self.write_file(frame.log_path, 'ab')
959 self.server.getSeenSlave(self.headers['slave-id'])
961 else: # frame not found
962 self.send_head(http.client.NO_CONTENT)
963 else: # job not found
964 self.send_head(http.client.NO_CONTENT)
965 else: # invalid url
966 self.send_head(http.client.NO_CONTENT)
968 class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
969 def __init__(self, address, handler_class, path, force=False, subdir=True):
970 self.jobs = []
971 self.jobs_map = {}
972 self.slaves = []
973 self.slaves_map = {}
974 self.job_id = 0
975 self.force = force
977 if subdir:
978 self.path = os.path.join(path, "master_" + str(os.getpid()))
979 else:
980 self.path = path
982 verifyCreateDir(self.path)
984 self.slave_timeout = 5 # 5 mins: need a parameter for that
986 self.balancer = netrender.balancing.Balancer()
987 self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
988 self.balancer.addRule(netrender.balancing.RatingUsage())
989 self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
990 self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
991 self.balancer.addPriority(netrender.balancing.NewJobPriority())
992 self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
994 super().__init__(address, handler_class)
996 def restore(self, jobs, slaves, balancer = None):
997 self.jobs = jobs
998 self.jobs_map = {}
1000 for job in self.jobs:
1001 self.jobs_map[job.id] = job
1002 self.job_id = max(self.job_id, int(job.id))
1004 self.slaves = slaves
1005 for slave in self.slaves:
1006 self.slaves_map[slave.id] = slave
1008 if balancer:
1009 self.balancer = balancer
1012 def nextJobID(self):
1013 self.job_id += 1
1014 return str(self.job_id)
1016 def addSlave(self, slave_info):
1017 slave = MRenderSlave(slave_info)
1018 self.slaves.append(slave)
1019 self.slaves_map[slave.id] = slave
1021 return slave.id
1023 def removeSlave(self, slave):
1024 self.slaves.remove(slave)
1025 self.slaves_map.pop(slave.id)
1027 def getSlave(self, slave_id):
1028 return self.slaves_map.get(slave_id)
1030 def getSeenSlave(self, slave_id):
1031 slave = self.getSlave(slave_id)
1032 if slave:
1033 slave.seen()
1035 return slave
1037 def timeoutSlaves(self):
1038 removed = []
1040 t = time.time()
1042 for slave in self.slaves:
1043 if (t - slave.last_seen) / 60 > self.slave_timeout:
1044 removed.append(slave)
1046 if slave.job:
1047 for f in slave.job_frames:
1048 slave.job[f].status = netrender.model.FRAME_ERROR
1050 for slave in removed:
1051 self.removeSlave(slave)
1053 def updateUsage(self):
1054 blend = 0.5
1055 for job in self.jobs:
1056 job.usage *= (1 - blend)
1058 if self.slaves:
1059 slave_usage = blend / self.countSlaves()
1061 for slave in self.slaves:
1062 if slave.job:
1063 slave.job.usage += slave_usage
1066 def clear(self, clear_files = False):
1067 removed = self.jobs[:]
1069 for job in removed:
1070 self.removeJob(job, clear_files)
1072 def balance(self):
1073 self.balancer.balance(self.jobs)
1075 def getJobs(self):
1076 return self.jobs
1078 def countJobs(self, status = netrender.model.JOB_QUEUED):
1079 total = 0
1080 for j in self.jobs:
1081 if j.status == status:
1082 total += 1
1084 return total
1086 def countSlaves(self):
1087 return len(self.slaves)
1089 def removeJob(self, job, clear_files = False):
1090 self.jobs.remove(job)
1091 self.jobs_map.pop(job.id)
1093 if clear_files:
1094 shutil.rmtree(job.save_path)
1096 for slave in self.slaves:
1097 if slave.job == job:
1098 slave.job = None
1099 slave.job_frames = []
1101 def addJob(self, job):
1102 self.jobs.append(job)
1103 self.jobs_map[job.id] = job
1105 # create job directory
1106 job.save_path = os.path.join(self.path, "job_" + job.id)
1107 verifyCreateDir(job.save_path)
1109 job.save()
1111 def getJobID(self, id):
1112 return self.jobs_map.get(id)
1114 def __iter__(self):
1115 for job in self.jobs:
1116 yield job
1118 def newDispatch(self, slave):
1119 if self.jobs:
1120 for job in self.jobs:
1121 if (
1122 not self.balancer.applyExceptions(job) # No exceptions
1123 and slave.id not in job.blacklist # slave is not blacklisted
1124 and (not slave.tags or job.tags.issubset(slave.tags)) # slave doesn't use tags or slave has all job tags
1127 return job, job.getFrames()
1129 return None, None
1131 def clearMaster(path):
1132 shutil.rmtree(path)
1134 def createMaster(address, clear, force, path):
1135 filepath = os.path.join(path, "blender_master.data")
1137 if not clear and os.path.exists(filepath):
1138 print("loading saved master:", filepath)
1139 with open(filepath, 'rb') as f:
1140 path, jobs, slaves = pickle.load(f)
1142 httpd = RenderMasterServer(address, RenderHandler, path, force=force, subdir=False)
1143 httpd.restore(jobs, slaves)
1145 return httpd
1147 return RenderMasterServer(address, RenderHandler, path, force=force)
1149 def saveMaster(path, httpd):
1150 filepath = os.path.join(path, "blender_master.data")
1152 with open(filepath, 'wb') as f:
1153 pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL)
1155 def runMaster(address, broadcast, clear, force, path, update_stats, test_break,use_ssl=False,cert_path="",key_path=""):
1156 httpd = createMaster(address, clear, force, path)
1157 httpd.timeout = 1
1158 httpd.stats = update_stats
1159 if use_ssl:
1160 import ssl
1161 httpd.socket = ssl.wrap_socket(
1162 httpd.socket,
1163 certfile=cert_path,
1164 server_side=True,
1165 keyfile=key_path,
1166 ciphers="ALL",
1167 ssl_version=ssl.PROTOCOL_SSLv23,
1169 if broadcast:
1170 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1171 s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1173 start_time = time.time() - 2
1175 while not test_break():
1176 try:
1177 httpd.handle_request()
1178 except select.error:
1179 pass
1181 if time.time() - start_time >= 2: # need constant here
1182 httpd.timeoutSlaves()
1184 httpd.updateUsage()
1186 if broadcast:
1187 print("broadcasting address")
1188 s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
1189 start_time = time.time()
1191 httpd.server_close()
1192 if clear:
1193 clearMaster(httpd.path)
1194 else:
1195 saveMaster(path, httpd)