1 # ##### BEGIN GPL LICENSE BLOCK #####
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software Foundation,
15 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 # ##### END GPL LICENSE BLOCK #####
20 import http
, http
.client
, http
.server
, socket
, socketserver
21 import shutil
, time
, hashlib
24 import select
# for select.error
28 from netrender
.utils
import *
29 import netrender
.model
30 import netrender
.balancing
31 import netrender
.master_html
32 import netrender
.thumbnail
as thumbnail
34 class MRenderFile(netrender
.model
.RenderFile
):
35 def __init__(self
, filepath
, index
, start
, end
, signature
):
36 super().__init
__(filepath
, index
, start
, end
, signature
)
39 def updateStatus(self
):
40 self
.found
= os
.path
.exists(self
.filepath
)
42 if self
.found
and self
.signature
is not None:
43 found_signature
= hashFile(self
.filepath
)
44 self
.found
= self
.signature
== found_signature
46 print("Signature mismatch", self
.signature
, found_signature
)
51 # don't check when forcing upload and only until found
52 if not self
.force
and not self
.found
:
58 class MRenderSlave(netrender
.model
.RenderSlave
):
59 def __init__(self
, slave_info
):
60 super().__init
__(slave_info
)
61 self
.id = hashlib
.md5(bytes(repr(slave_info
.name
) + repr(slave_info
.address
), encoding
='utf8')).hexdigest()
62 self
.last_seen
= time
.time()
68 netrender
.model
.RenderSlave
._slave
_map
[self
.id] = self
71 self
.last_seen
= time
.time()
73 def finishedFrame(self
, frame_number
):
75 self
.job_frames
.remove(frame_number
)
76 except ValueError as e
:
77 print("Internal error: Frame %i not in job frames list" % frame_number
)
78 print(self
.job_frames
)
79 if not self
.job_frames
:
82 class MRenderJob(netrender
.model
.RenderJob
):
83 def __init__(self
, job_id
, job_info
):
84 super().__init
__(job_info
)
86 self
.last_dispatched
= time
.time()
87 self
.start_time
= time
.time()
88 self
.finish_time
= self
.start_time
89 # force one chunk for process jobs
90 if self
.type == netrender
.model
.JOB_PROCESS
:
93 # Force WAITING status on creation
94 self
.status
= netrender
.model
.JOB_WAITING
96 # special server properties
99 self
.files
= [MRenderFile(rfile
.filepath
, rfile
.index
, rfile
.start
, rfile
.end
, rfile
.signature
) for rfile
in job_info
.files
]
101 def setForceUpload(self
, force
):
102 for rfile
in self
.files
:
106 if not self
.resolution
:
107 self
.resolution
= tuple(getFileInfo(self
.files
[0].filepath
, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"]))
111 f
= open(os
.path
.join(self
.save_path
, "job.txt"), "w")
112 f
.write(json
.dumps(self
.serialize()))
115 def edit(self
, info_map
):
116 if "status" in info_map
:
117 self
.status
= info_map
["status"]
119 if "priority" in info_map
:
120 self
.priority
= info_map
["priority"]
122 if "chunks" in info_map
:
123 self
.chunks
= info_map
["chunks"]
126 # Don't test files for versionned jobs
127 if not self
.version_info
:
136 def testFinished(self
):
137 for f
in self
.frames
:
138 if f
.status
in {netrender
.model
.FRAME_QUEUED
, netrender
.model
.FRAME_DISPATCHED
, netrender
.model
.FRAME_ERROR
}:
141 self
.status
= netrender
.model
.JOB_FINISHED
142 self
.finish_time
=time
.time()
144 def pause(self
, status
= None):
145 if self
.status
not in {netrender
.model
.JOB_PAUSED
, netrender
.model
.JOB_QUEUED
}:
149 self
.status
= netrender
.model
.JOB_PAUSED
if self
.status
== netrender
.model
.JOB_QUEUED
else netrender
.model
.JOB_QUEUED
151 self
.status
= netrender
.model
.JOB_QUEUED
153 self
.status
= netrender
.model
.JOB_PAUSED
156 self
.status
= netrender
.model
.JOB_QUEUED
159 def addLog(self
, frames
):
160 frames
= sorted(frames
)
161 log_name
= "%06d_%06d.log" % (frames
[0], frames
[-1])
162 log_path
= os
.path
.join(self
.save_path
, log_name
)
164 for number
in frames
:
167 frame
.log_path
= log_path
169 def addFrame(self
, frame_number
, command
):
170 frame
= MRenderFrame(frame_number
, command
)
171 self
.frames
.append(frame
)
174 def reset(self
, all
):
175 for f
in self
.frames
:
179 self
.status
= netrender
.model
.JOB_QUEUED
183 for f
in self
.frames
:
184 if f
.status
== netrender
.model
.FRAME_QUEUED
:
185 self
.last_dispatched
= time
.time()
187 if len(frames
) >= self
.chunks
:
192 def getResultPath(self
, filename
):
193 return os
.path
.join(self
.save_path
, filename
)
195 class MRenderFrame(netrender
.model
.RenderFrame
):
196 def __init__(self
, frame
, command
):
201 self
.status
= netrender
.model
.FRAME_QUEUED
202 self
.command
= command
206 def addDefaultRenderResult(self
):
207 self
.results
.append(self
.getRenderFilename())
209 def getRenderFilename(self
):
210 return "%06d.exr" % self
.number
212 def reset(self
, all
):
213 if all
or self
.status
== netrender
.model
.FRAME_ERROR
:
217 self
.status
= netrender
.model
.FRAME_QUEUED
220 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
221 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
222 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
223 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
224 file_pattern
= re
.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
225 render_pattern
= re
.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr")
226 result_pattern
= re
.compile("/result_([a-zA-Z0-9]+).zip")
227 thumb_pattern
= re
.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg")
228 log_pattern
= re
.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
229 reset_pattern
= re
.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
230 cancel_pattern
= re
.compile("/cancel_([a-zA-Z0-9]+)")
231 pause_pattern
= re
.compile("/pause_([a-zA-Z0-9]+)")
232 edit_pattern
= re
.compile("/edit_([a-zA-Z0-9]+)")
234 class RenderHandler(http
.server
.BaseHTTPRequestHandler
):
235 def write_file(self
, file_path
, mode
= 'wb'):
236 length
= int(self
.headers
['content-length'])
237 f
= open(file_path
, mode
)
238 buf
= self
.rfile
.read(length
)
243 def log_message(self
, format
, *args
):
244 # override because the original calls self.address_string(), which
245 # is extremely slow due to some timeout..
246 sys
.stderr
.write("[%s] %s\n" % (self
.log_date_time_string(), format
%args
))
248 def getInfoMap(self
):
249 length
= int(self
.headers
['content-length'])
252 msg
= str(self
.rfile
.read(length
), encoding
='utf8')
253 return json
.loads(msg
)
257 def send_head(self
, code
= http
.client
.OK
, headers
= {}, content
= "application/octet-stream"):
258 self
.send_response(code
)
260 if code
== http
.client
.OK
and content
:
261 self
.send_header("Content-type", content
)
263 for key
, value
in headers
.items():
264 self
.send_header(key
, value
)
270 if self
.path
== "/status":
271 job_id
= self
.headers
.get('job-id', "")
272 job_frame
= int(self
.headers
.get('job-frame', -1))
274 job
= self
.server
.getJobID(job_id
)
276 frame
= job
[job_frame
]
280 self
.send_head(http
.client
.OK
)
283 self
.send_head(http
.client
.NO_CONTENT
)
286 self
.send_head(http
.client
.NO_CONTENT
)
288 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
289 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
290 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
291 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
292 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
296 if self
.path
== "/version":
298 self
.server
.stats("", "Version check")
299 self
.wfile
.write(VERSION
)
300 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
301 elif self
.path
.startswith("/render"):
302 match
= render_pattern
.match(self
.path
)
305 job_id
= match
.groups()[0]
306 frame_number
= int(match
.groups()[1])
308 job
= self
.server
.getJobID(job_id
)
311 frame
= job
[frame_number
]
314 if frame
.status
in {netrender
.model
.FRAME_QUEUED
, netrender
.model
.FRAME_DISPATCHED
}:
315 self
.send_head(http
.client
.ACCEPTED
)
316 elif frame
.status
== netrender
.model
.FRAME_DONE
:
317 self
.server
.stats("", "Sending result to client")
319 filename
= job
.getResultPath(frame
.getRenderFilename())
321 f
= open(filename
, 'rb')
322 self
.send_head(content
= "image/x-exr")
323 shutil
.copyfileobj(f
, self
.wfile
)
325 elif frame
.status
== netrender
.model
.FRAME_ERROR
:
326 self
.send_head(http
.client
.PARTIAL_CONTENT
)
329 self
.send_head(http
.client
.NO_CONTENT
)
332 self
.send_head(http
.client
.NO_CONTENT
)
335 self
.send_head(http
.client
.NO_CONTENT
)
336 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
337 elif self
.path
.startswith("/result"):
338 match
= result_pattern
.match(self
.path
)
341 job_id
= match
.groups()[0]
343 job
= self
.server
.getJobID(job_id
)
346 self
.server
.stats("", "Sending result to client")
348 zip_filepath
= job
.getResultPath("results.zip")
349 with zipfile
.ZipFile(zip_filepath
, "w") as zfile
:
350 for frame
in job
.frames
:
351 if frame
.status
== netrender
.model
.FRAME_DONE
:
352 for filename
in frame
.results
:
353 filepath
= job
.getResultPath(filename
)
355 zfile
.write(filepath
, filename
)
358 f
= open(zip_filepath
, 'rb')
359 self
.send_head(content
= "application/x-zip-compressed")
360 shutil
.copyfileobj(f
, self
.wfile
)
364 self
.send_head(http
.client
.NO_CONTENT
)
367 self
.send_head(http
.client
.NO_CONTENT
)
368 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
369 elif self
.path
.startswith("/thumb"):
370 match
= thumb_pattern
.match(self
.path
)
373 job_id
= match
.groups()[0]
374 frame_number
= int(match
.groups()[1])
376 job
= self
.server
.getJobID(job_id
)
379 frame
= job
[frame_number
]
382 if frame
.status
in {netrender
.model
.FRAME_QUEUED
, netrender
.model
.FRAME_DISPATCHED
}:
383 self
.send_head(http
.client
.ACCEPTED
)
384 elif frame
.status
== netrender
.model
.FRAME_DONE
:
385 filename
= job
.getResultPath(frame
.getRenderFilename())
387 thumbname
= thumbnail
.generate(filename
)
390 f
= open(thumbname
, 'rb')
391 self
.send_head(content
= "image/jpeg")
392 shutil
.copyfileobj(f
, self
.wfile
)
394 else: # thumbnail couldn't be generated
395 self
.send_head(http
.client
.PARTIAL_CONTENT
)
397 elif frame
.status
== netrender
.model
.FRAME_ERROR
:
398 self
.send_head(http
.client
.PARTIAL_CONTENT
)
401 self
.send_head(http
.client
.NO_CONTENT
)
404 self
.send_head(http
.client
.NO_CONTENT
)
407 self
.send_head(http
.client
.NO_CONTENT
)
408 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
409 elif self
.path
.startswith("/log"):
410 match
= log_pattern
.match(self
.path
)
413 job_id
= match
.groups()[0]
414 frame_number
= int(match
.groups()[1])
416 job
= self
.server
.getJobID(job_id
)
419 frame
= job
[frame_number
]
422 if not frame
.log_path
or frame
.status
in {netrender
.model
.FRAME_QUEUED
, netrender
.model
.FRAME_DISPATCHED
}:
423 self
.send_head(http
.client
.PROCESSING
)
425 self
.server
.stats("", "Sending log to client")
426 f
= open(frame
.log_path
, 'rb')
428 self
.send_head(content
= "text/plain")
430 shutil
.copyfileobj(f
, self
.wfile
)
435 self
.send_head(http
.client
.NO_CONTENT
)
438 self
.send_head(http
.client
.NO_CONTENT
)
441 self
.send_head(http
.client
.NO_CONTENT
)
442 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
443 elif self
.path
== "/status":
444 job_id
= self
.headers
.get('job-id', "")
445 job_frame
= int(self
.headers
.get('job-frame', -1))
449 job
= self
.server
.getJobID(job_id
)
455 message
= frame
.serialize()
458 self
.send_head(http
.client
.NO_CONTENT
)
461 message
= job
.serialize()
464 self
.send_head(http
.client
.NO_CONTENT
)
466 else: # status of all jobs
469 for job
in self
.server
:
470 message
.append(job
.serialize())
473 self
.server
.stats("", "Sending status")
475 self
.wfile
.write(bytes(json
.dumps(message
), encoding
='utf8'))
477 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
478 elif self
.path
== "/job":
479 self
.server
.balance()
481 slave_id
= self
.headers
['slave-id']
483 slave
= self
.server
.getSeenSlave(slave_id
)
485 if slave
: # only if slave id is valid
486 job
, frames
= self
.server
.newDispatch(slave
)
490 print("dispatch", f
.number
)
491 f
.status
= netrender
.model
.FRAME_DISPATCHED
495 slave
.job_frames
= [f
.number
for f
in frames
]
497 self
.send_head(headers
={"job-id": job
.id})
499 message
= job
.serialize(frames
)
500 self
.wfile
.write(bytes(json
.dumps(message
), encoding
='utf8'))
502 self
.server
.stats("", "Sending job to slave")
504 # no job available, return error code
506 slave
.job_frames
= []
508 self
.send_head(http
.client
.ACCEPTED
)
509 else: # invalid slave id
510 self
.send_head(http
.client
.NO_CONTENT
)
511 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
512 elif self
.path
.startswith("/file"):
513 match
= file_pattern
.match(self
.path
)
516 slave_id
= self
.headers
['slave-id']
517 slave
= self
.server
.getSeenSlave(slave_id
)
521 print("invalid slave id")
523 job_id
= match
.groups()[0]
524 file_index
= int(match
.groups()[1])
526 job
= self
.server
.getJobID(job_id
)
529 render_file
= job
.files
[file_index
]
532 self
.server
.stats("", "Sending file to slave")
533 f
= open(render_file
.filepath
, 'rb')
536 shutil
.copyfileobj(f
, self
.wfile
)
541 self
.send_head(http
.client
.NO_CONTENT
)
544 self
.send_head(http
.client
.NO_CONTENT
)
546 self
.send_head(http
.client
.NO_CONTENT
)
547 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
548 elif self
.path
== "/slaves":
551 self
.server
.stats("", "Sending slaves status")
553 for slave
in self
.server
.slaves
:
554 message
.append(slave
.serialize())
558 self
.wfile
.write(bytes(json
.dumps(message
), encoding
='utf8'))
559 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
561 # hand over the rest to the html section
562 netrender
.master_html
.get(self
)
564 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
565 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
566 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
567 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
568 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
571 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
572 if self
.path
== "/job":
574 length
= int(self
.headers
['content-length'])
576 job_info
= netrender
.model
.RenderJob
.materialize(json
.loads(str(self
.rfile
.read(length
), encoding
='utf8')))
577 job_id
= self
.server
.nextJobID()
579 job
= MRenderJob(job_id
, job_info
)
581 job
.setForceUpload(self
.server
.force
)
583 for frame
in job_info
.frames
:
584 frame
= job
.addFrame(frame
.number
, frame
.command
)
586 self
.server
.addJob(job
)
588 headers
={"job-id": job_id
}
591 self
.server
.stats("", "New job, started")
592 self
.send_head(headers
=headers
, content
= None)
594 self
.server
.stats("", "New job, missing files (%i total)" % len(job
.files
))
595 self
.send_head(http
.client
.ACCEPTED
, headers
=headers
)
596 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
597 elif self
.path
.startswith("/edit"):
598 match
= edit_pattern
.match(self
.path
)
601 job_id
= match
.groups()[0]
603 job
= self
.server
.getJobID(job_id
)
606 info_map
= self
.getInfoMap()
609 self
.send_head(content
= None)
612 self
.send_head(http
.client
.NO_CONTENT
)
615 self
.send_head(http
.client
.NO_CONTENT
)
616 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
617 elif self
.path
== "/balance_limit":
618 info_map
= self
.getInfoMap()
619 for rule_id
, limit
in info_map
.items():
621 rule
= self
.server
.balancer
.ruleByID(rule_id
)
627 self
.send_head(content
= None)
628 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
629 elif self
.path
== "/balance_enable":
630 info_map
= self
.getInfoMap()
631 for rule_id
, enabled
in info_map
.items():
632 rule
= self
.server
.balancer
.ruleByID(rule_id
)
634 rule
.enabled
= enabled
636 self
.send_head(content
= None)
637 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
638 elif self
.path
.startswith("/cancel"):
639 match
= cancel_pattern
.match(self
.path
)
642 info_map
= self
.getInfoMap()
643 clear
= info_map
.get("clear", False)
645 job_id
= match
.groups()[0]
647 job
= self
.server
.getJobID(job_id
)
650 self
.server
.stats("", "Cancelling job")
651 self
.server
.removeJob(job
, clear
)
652 self
.send_head(content
= None)
655 self
.send_head(http
.client
.NO_CONTENT
)
658 self
.send_head(http
.client
.NO_CONTENT
)
659 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
660 elif self
.path
.startswith("/pause"):
661 match
= pause_pattern
.match(self
.path
)
664 info_map
= self
.getInfoMap()
665 status
= info_map
.get("status", None)
667 job_id
= match
.groups()[0]
669 job
= self
.server
.getJobID(job_id
)
672 self
.server
.stats("", "Pausing job")
674 self
.send_head(content
= None)
677 self
.send_head(http
.client
.NO_CONTENT
)
680 self
.send_head(http
.client
.NO_CONTENT
)
681 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
682 elif self
.path
== "/clear":
684 info_map
= self
.getInfoMap()
685 clear
= info_map
.get("clear", False)
687 self
.server
.stats("", "Clearing jobs")
688 self
.server
.clear(clear
)
690 self
.send_head(content
= None)
691 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
692 elif self
.path
.startswith("/reset"):
693 match
= reset_pattern
.match(self
.path
)
696 all
= match
.groups()[0] == 'all'
697 job_id
= match
.groups()[1]
698 job_frame
= int(match
.groups()[2])
700 job
= self
.server
.getJobID(job_id
)
705 frame
= job
[job_frame
]
707 self
.server
.stats("", "Reset job frame")
709 self
.send_head(content
= None)
712 self
.send_head(http
.client
.NO_CONTENT
)
715 self
.server
.stats("", "Reset job")
717 self
.send_head(content
= None)
719 else: # job not found
720 self
.send_head(http
.client
.NO_CONTENT
)
722 self
.send_head(http
.client
.NO_CONTENT
)
723 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
724 elif self
.path
== "/slave":
725 length
= int(self
.headers
['content-length'])
726 # job_frame_string = self.headers['job-frame'] # UNUSED
728 self
.server
.stats("", "New slave connected")
730 slave_info
= netrender
.model
.RenderSlave
.materialize(json
.loads(str(self
.rfile
.read(length
), encoding
='utf8')), cache
= False)
732 slave_info
.address
= self
.client_address
734 slave_id
= self
.server
.addSlave(slave_info
)
736 self
.send_head(headers
= {"slave-id": slave_id
}, content
= None)
737 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
738 elif self
.path
== "/log":
739 length
= int(self
.headers
['content-length'])
741 log_info
= netrender
.model
.LogFile
.materialize(json
.loads(str(self
.rfile
.read(length
), encoding
='utf8')))
743 slave_id
= log_info
.slave_id
745 slave
= self
.server
.getSeenSlave(slave_id
)
747 if slave
: # only if slave id is valid
748 job
= self
.server
.getJobID(log_info
.job_id
)
751 self
.server
.stats("", "Log announcement")
752 job
.addLog(log_info
.frames
)
753 self
.send_head(content
= None)
756 self
.send_head(http
.client
.NO_CONTENT
)
757 else: # invalid slave id
758 self
.send_head(http
.client
.NO_CONTENT
)
759 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
760 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
761 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
762 # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
763 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
766 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
767 if self
.path
.startswith("/file"):
768 match
= file_pattern
.match(self
.path
)
771 self
.server
.stats("", "Receiving job file")
773 job_id
= match
.groups()[0]
774 file_index
= int(match
.groups()[1])
776 job
= self
.server
.getJobID(job_id
)
780 rfile
= job
.files
[file_index
]
783 main_file
= job
.files
[0].original_path
# original path of the first file
785 main_path
, main_name
= os
.path
.split(main_file
)
788 file_path
= createLocalPath(rfile
, job
.save_path
, main_path
, True)
790 file_path
= os
.path
.join(job
.save_path
, main_name
)
792 # add same temp file + renames as slave
794 self
.write_file(file_path
)
796 rfile
.filepath
= file_path
# set the new path
797 found
= rfile
.updateStatus() # make sure we have the right file
799 if not found
: # checksum mismatch
800 self
.server
.stats("", "File upload but checksum mismatch, this shouldn't happen")
801 self
.send_head(http
.client
.CONFLICT
)
802 elif job
.testStart(): # started correctly
803 self
.server
.stats("", "File upload, starting job")
804 self
.send_head(content
= None)
806 self
.server
.stats("", "File upload, dependency files still missing")
807 self
.send_head(http
.client
.ACCEPTED
)
809 print("file not found", job_id
, file_index
)
810 self
.send_head(http
.client
.NO_CONTENT
)
811 else: # job not found
812 print("job not found", job_id
, file_index
)
813 self
.send_head(http
.client
.NO_CONTENT
)
816 self
.send_head(http
.client
.NO_CONTENT
)
817 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
818 elif self
.path
== "/render":
819 self
.server
.stats("", "Receiving render result")
821 slave_id
= self
.headers
['slave-id']
823 slave
= self
.server
.getSeenSlave(slave_id
)
825 if slave
: # only if slave id is valid
826 job_id
= self
.headers
['job-id']
828 job
= self
.server
.getJobID(job_id
)
831 job_frame
= int(self
.headers
['job-frame'])
832 job_result
= int(self
.headers
['job-result'])
833 job_time
= float(self
.headers
['job-time'])
835 frame
= job
[job_frame
]
838 self
.send_head(content
= None)
840 if job
.hasRenderResult():
841 if job_result
== netrender
.model
.FRAME_DONE
:
842 frame
.addDefaultRenderResult()
843 self
.write_file(job
.getResultPath(frame
.getRenderFilename()))
845 elif job_result
== netrender
.model
.FRAME_ERROR
:
846 # blacklist slave on this job on error
847 # slaves might already be in blacklist if errors on the whole chunk
848 if not slave
.id in job
.blacklist
:
849 job
.blacklist
.append(slave
.id)
851 slave
.finishedFrame(job_frame
)
853 frame
.status
= job_result
854 frame
.time
= job_time
858 else: # frame not found
859 self
.send_head(http
.client
.NO_CONTENT
)
860 else: # job not found
861 self
.send_head(http
.client
.NO_CONTENT
)
862 else: # invalid slave id
863 self
.send_head(http
.client
.NO_CONTENT
)
864 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
865 elif self
.path
== "/result":
866 self
.server
.stats("", "Receiving job result")
868 slave_id
= self
.headers
['slave-id']
870 slave
= self
.server
.getSeenSlave(slave_id
)
872 if slave
: # only if slave id is valid
873 job_id
= self
.headers
['job-id']
875 job
= self
.server
.getJobID(job_id
)
878 job_frame
= int(self
.headers
['job-frame'])
880 frame
= job
[job_frame
]
883 job_result
= int(self
.headers
['job-result'])
884 job_finished
= self
.headers
['job-finished'] == str(True)
886 self
.send_head(content
= None)
888 if job_result
== netrender
.model
.FRAME_DONE
:
889 result_filename
= self
.headers
['result-filename']
891 frame
.results
.append(result_filename
)
892 self
.write_file(job
.getResultPath(result_filename
))
895 job_time
= float(self
.headers
['job-time'])
896 slave
.finishedFrame(job_frame
)
898 frame
.status
= job_result
899 frame
.time
= job_time
902 else: # frame not found
903 self
.send_head(http
.client
.NO_CONTENT
)
904 else: # job not found
905 self
.send_head(http
.client
.NO_CONTENT
)
906 else: # invalid slave id
907 self
.send_head(http
.client
.NO_CONTENT
)
908 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
909 elif self
.path
== "/thumb":
910 self
.server
.stats("", "Receiving thumbnail result")
912 slave_id
= self
.headers
['slave-id']
914 slave
= self
.server
.getSeenSlave(slave_id
)
916 if slave
: # only if slave id is valid
917 job_id
= self
.headers
['job-id']
919 job
= self
.server
.getJobID(job_id
)
922 job_frame
= int(self
.headers
['job-frame'])
924 frame
= job
[job_frame
]
927 self
.send_head(content
= None)
929 if job
.hasRenderResult():
930 self
.write_file(os
.path
.join(os
.path
.join(job
.save_path
, "%06d.jpg" % job_frame
)))
932 else: # frame not found
933 self
.send_head(http
.client
.NO_CONTENT
)
934 else: # job not found
935 self
.send_head(http
.client
.NO_CONTENT
)
936 else: # invalid slave id
937 self
.send_head(http
.client
.NO_CONTENT
)
938 # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
939 elif self
.path
.startswith("/log"):
940 self
.server
.stats("", "Receiving log file")
942 match
= log_pattern
.match(self
.path
)
945 job_id
= match
.groups()[0]
947 job
= self
.server
.getJobID(job_id
)
950 job_frame
= int(match
.groups()[1])
952 frame
= job
[job_frame
]
954 if frame
and frame
.log_path
:
955 self
.send_head(content
= None)
957 self
.write_file(frame
.log_path
, 'ab')
959 self
.server
.getSeenSlave(self
.headers
['slave-id'])
961 else: # frame not found
962 self
.send_head(http
.client
.NO_CONTENT
)
963 else: # job not found
964 self
.send_head(http
.client
.NO_CONTENT
)
966 self
.send_head(http
.client
.NO_CONTENT
)
968 class RenderMasterServer(socketserver
.ThreadingMixIn
, http
.server
.HTTPServer
):
969 def __init__(self
, address
, handler_class
, path
, force
=False, subdir
=True):
978 self
.path
= os
.path
.join(path
, "master_" + str(os
.getpid()))
982 verifyCreateDir(self
.path
)
984 self
.slave_timeout
= 5 # 5 mins: need a parameter for that
986 self
.balancer
= netrender
.balancing
.Balancer()
987 self
.balancer
.addRule(netrender
.balancing
.RatingUsageByCategory(self
.getJobs
))
988 self
.balancer
.addRule(netrender
.balancing
.RatingUsage())
989 self
.balancer
.addException(netrender
.balancing
.ExcludeQueuedEmptyJob())
990 self
.balancer
.addException(netrender
.balancing
.ExcludeSlavesLimit(self
.countJobs
, self
.countSlaves
, limit
= 0.9))
991 self
.balancer
.addPriority(netrender
.balancing
.NewJobPriority())
992 self
.balancer
.addPriority(netrender
.balancing
.MinimumTimeBetweenDispatchPriority(limit
= 2))
994 super().__init
__(address
, handler_class
)
996 def restore(self
, jobs
, slaves
, balancer
= None):
1000 for job
in self
.jobs
:
1001 self
.jobs_map
[job
.id] = job
1002 self
.job_id
= max(self
.job_id
, int(job
.id))
1004 self
.slaves
= slaves
1005 for slave
in self
.slaves
:
1006 self
.slaves_map
[slave
.id] = slave
1009 self
.balancer
= balancer
1012 def nextJobID(self
):
1014 return str(self
.job_id
)
1016 def addSlave(self
, slave_info
):
1017 slave
= MRenderSlave(slave_info
)
1018 self
.slaves
.append(slave
)
1019 self
.slaves_map
[slave
.id] = slave
1023 def removeSlave(self
, slave
):
1024 self
.slaves
.remove(slave
)
1025 self
.slaves_map
.pop(slave
.id)
1027 def getSlave(self
, slave_id
):
1028 return self
.slaves_map
.get(slave_id
)
1030 def getSeenSlave(self
, slave_id
):
1031 slave
= self
.getSlave(slave_id
)
1037 def timeoutSlaves(self
):
1042 for slave
in self
.slaves
:
1043 if (t
- slave
.last_seen
) / 60 > self
.slave_timeout
:
1044 removed
.append(slave
)
1047 for f
in slave
.job_frames
:
1048 slave
.job
[f
].status
= netrender
.model
.FRAME_ERROR
1050 for slave
in removed
:
1051 self
.removeSlave(slave
)
1053 def updateUsage(self
):
1055 for job
in self
.jobs
:
1056 job
.usage
*= (1 - blend
)
1059 slave_usage
= blend
/ self
.countSlaves()
1061 for slave
in self
.slaves
:
1063 slave
.job
.usage
+= slave_usage
1066 def clear(self
, clear_files
= False):
1067 removed
= self
.jobs
[:]
1070 self
.removeJob(job
, clear_files
)
1073 self
.balancer
.balance(self
.jobs
)
1078 def countJobs(self
, status
= netrender
.model
.JOB_QUEUED
):
1081 if j
.status
== status
:
1086 def countSlaves(self
):
1087 return len(self
.slaves
)
1089 def removeJob(self
, job
, clear_files
= False):
1090 self
.jobs
.remove(job
)
1091 self
.jobs_map
.pop(job
.id)
1094 shutil
.rmtree(job
.save_path
)
1096 for slave
in self
.slaves
:
1097 if slave
.job
== job
:
1099 slave
.job_frames
= []
1101 def addJob(self
, job
):
1102 self
.jobs
.append(job
)
1103 self
.jobs_map
[job
.id] = job
1105 # create job directory
1106 job
.save_path
= os
.path
.join(self
.path
, "job_" + job
.id)
1107 verifyCreateDir(job
.save_path
)
1111 def getJobID(self
, id):
1112 return self
.jobs_map
.get(id)
1115 for job
in self
.jobs
:
1118 def newDispatch(self
, slave
):
1120 for job
in self
.jobs
:
1122 not self
.balancer
.applyExceptions(job
) # No exceptions
1123 and slave
.id not in job
.blacklist
# slave is not blacklisted
1124 and (not slave
.tags
or job
.tags
.issubset(slave
.tags
)) # slave doesn't use tags or slave has all job tags
1127 return job
, job
.getFrames()
1131 def clearMaster(path
):
1134 def createMaster(address
, clear
, force
, path
):
1135 filepath
= os
.path
.join(path
, "blender_master.data")
1137 if not clear
and os
.path
.exists(filepath
):
1138 print("loading saved master:", filepath
)
1139 with
open(filepath
, 'rb') as f
:
1140 path
, jobs
, slaves
= pickle
.load(f
)
1142 httpd
= RenderMasterServer(address
, RenderHandler
, path
, force
=force
, subdir
=False)
1143 httpd
.restore(jobs
, slaves
)
1147 return RenderMasterServer(address
, RenderHandler
, path
, force
=force
)
1149 def saveMaster(path
, httpd
):
1150 filepath
= os
.path
.join(path
, "blender_master.data")
1152 with
open(filepath
, 'wb') as f
:
1153 pickle
.dump((httpd
.path
, httpd
.jobs
, httpd
.slaves
), f
, pickle
.HIGHEST_PROTOCOL
)
1155 def runMaster(address
, broadcast
, clear
, force
, path
, update_stats
, test_break
,use_ssl
=False,cert_path
="",key_path
=""):
1156 httpd
= createMaster(address
, clear
, force
, path
)
1158 httpd
.stats
= update_stats
1161 httpd
.socket
= ssl
.wrap_socket(
1167 ssl_version
=ssl
.PROTOCOL_SSLv23
,
1170 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
1171 s
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_BROADCAST
, 1)
1173 start_time
= time
.time() - 2
1175 while not test_break():
1177 httpd
.handle_request()
1178 except select
.error
:
1181 if time
.time() - start_time
>= 2: # need constant here
1182 httpd
.timeoutSlaves()
1187 print("broadcasting address")
1188 s
.sendto(bytes("%i" % address
[1], encoding
='utf8'), 0, ('<broadcast>', 8000))
1189 start_time
= time
.time()
1191 httpd
.server_close()
1193 clearMaster(httpd
.path
)
1195 saveMaster(path
, httpd
)