7 #include "filesystem.h"
11 #include "packagedispatcher.h"
12 #include "preferences.h"
14 #include "renderfarm.h"
15 #include "renderfarmclient.h"
17 #include "transportque.h"
20 #include <arpa/inet.h>
24 #include <netinet/in.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
36 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb,
37 PackageDispatcher *packages,
38 Preferences *preferences,
41 int64_t *total_return,
42 Mutex *total_return_lock,
47 this->plugindb = plugindb;
48 this->packages = packages;
49 this->preferences = preferences;
50 this->use_local_rate = use_local_rate;
51 this->result_return = result_return;
52 this->total_return = total_return;
53 this->total_return_lock = total_return_lock;
54 this->default_asset = default_asset;
56 this->brender = brender;
57 client_lock = new Mutex("RenderFarmServer::client_lock");
60 RenderFarmServer::~RenderFarmServer()
62 clients.remove_all_objects();
66 // Open connections to clients.
67 int RenderFarmServer::start_clients()
71 for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
73 client_lock->lock("RenderFarmServer::start_clients");
74 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb,
77 clients.append(client);
79 result = client->start_loop();
80 client_lock->unlock();
86 // The render farm must wait for all the clients to finish.
87 int RenderFarmServer::wait_clients()
89 //printf("RenderFarmServer::wait_clients 1\n");
90 clients.remove_all_objects();
91 //printf("RenderFarmServer::wait_clients 2\n");
106 // Waits for requests from every client.
107 // Joins when the client is finished.
108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb,
109 RenderFarmServer *server,
113 this->plugindb = plugindb;
114 this->server = server;
115 this->number = number;
117 frames_per_second = 0;
121 Thread::set_synchronous(1);
126 RenderFarmServerThread::~RenderFarmServerThread()
128 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
130 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
131 if(socket_fd >= 0) close(socket_fd);
132 if(watchdog) delete watchdog;
133 if(buffer) delete [] buffer;
134 if(datagram) delete [] datagram;
135 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
139 int RenderFarmServerThread::open_client(char *hostname, int port)
144 // Open file for master node
145 if(hostname[0] == '/')
147 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
149 perror(_("RenderFarmServerThread::start_loop: socket\n"));
154 struct sockaddr_un addr;
155 addr.sun_family = AF_FILE;
156 strcpy(addr.sun_path, hostname);
157 int size = (offsetof(struct sockaddr_un, sun_path) +
158 strlen(hostname) + 1);
160 // The master node is always created by BRender. Keep trying for 30 seconds.
162 #define ATTEMPT_DELAY 100000
168 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
171 if(attempt > 30000000 / ATTEMPT_DELAY)
173 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
179 usleep(ATTEMPT_DELAY);
183 }while(!result && !done);
189 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
191 perror(_("RenderFarmServerThread::start_loop: socket"));
197 struct sockaddr_in addr;
198 struct hostent *hostinfo;
199 addr.sin_family = AF_INET;
200 addr.sin_port = htons(port);
201 hostinfo = gethostbyname(hostname);
204 fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"),
210 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
212 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
214 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"),
223 if(result) socket_fd = -1;
228 int RenderFarmServerThread::start_loop()
232 socket_fd = open_client(server->preferences->get_node_hostname(number),
233 server->preferences->get_node_port(number));
235 if(socket_fd < 0) result = 1;
239 watchdog = new RenderFarmWatchdog(this, 0);
243 if(!result) Thread::start();
258 int64_t RenderFarmServerThread::read_int64(int *error)
261 if(!error) error = &temp;
263 unsigned char data[sizeof(int64_t)];
264 *error = (read_socket((char*)data, sizeof(int64_t)) !=
267 // Make it return 1 if error so it can be used to read a result code from the
272 result = (((int64_t)data[0]) << 56) |
273 (((uint64_t)data[1]) << 48) |
274 (((uint64_t)data[2]) << 40) |
275 (((uint64_t)data[3]) << 32) |
276 (((uint64_t)data[4]) << 24) |
277 (((uint64_t)data[5]) << 16) |
278 (((uint64_t)data[6]) << 8) |
284 int RenderFarmServerThread::write_int64(int64_t value)
286 unsigned char data[sizeof(int64_t)];
287 data[0] = (value >> 56) & 0xff;
288 data[1] = (value >> 48) & 0xff;
289 data[2] = (value >> 40) & 0xff;
290 data[3] = (value >> 32) & 0xff;
291 data[4] = (value >> 24) & 0xff;
292 data[5] = (value >> 16) & 0xff;
293 data[6] = (value >> 8) & 0xff;
294 data[7] = value & 0xff;
295 return (write_socket((char*)data, sizeof(int64_t)) !=
301 int RenderFarmServerThread::read_socket(char *data, int len)
305 watchdog->begin_request();
306 while(len > 0 && bytes_read >= 0)
309 bytes_read = read(socket_fd, data + offset, len);
314 offset += bytes_read;
320 watchdog->end_request();
325 int RenderFarmServerThread::write_socket(char *data, int len)
327 return write(socket_fd, data, len);
330 void RenderFarmServerThread::reallocate_buffer(int size)
332 if(buffer && buffer_allocated < size)
340 buffer = new unsigned char[size];
341 buffer_allocated = size;
345 void RenderFarmServerThread::run()
348 unsigned char header[5];
354 buffer_allocated = 0;
355 // fs_server = new RenderFarmFSServer(this);
356 // fs_server->initialize();
360 // Send command to run package renderer.
361 write_int64(RENDERFARM_PACKAGES);
368 // Wait for requests.
369 // Requests consist of request ID's and accompanying buffers.
371 bytes_read = read_socket((char*)header, 5);
372 //printf("RenderFarmServerThread::run 1\n");
379 int request_id = header[0];
380 int64_t request_size = (((u_int32_t)header[1]) << 24) |
381 (((u_int32_t)header[2]) << 16) |
382 (((u_int32_t)header[3]) << 8) |
383 (u_int32_t)header[4];
385 reallocate_buffer(request_size);
387 // Get accompanying buffer
388 bytes_read = read_socket((char*)buffer, request_size);
389 //printf("RenderFarmServerThread::run 2 %d %lld %d\n", request_id, request_size, bytes_read);
390 if(bytes_read != request_size)
395 //printf("RenderFarmServerThread::run 3\n");
399 case RENDERFARM_PREFERENCES:
403 case RENDERFARM_ASSET:
411 case RENDERFARM_PACKAGE:
412 send_package(buffer);
415 case RENDERFARM_PROGRESS:
416 set_progress(buffer);
419 case RENDERFARM_SET_RESULT:
423 case RENDERFARM_SET_VMAP:
424 set_video_map(buffer);
427 case RENDERFARM_GET_RESULT:
431 case RENDERFARM_DONE:
432 //printf("RenderFarmServerThread::run 10\n");
436 case RENDERFARM_KEEPALIVE:
440 // if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
442 printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
446 //printf("RenderFarmServerThread::run 10 %d %lld\n", request_id, request_size);
449 // Don't let watchdog kill the entire renderfarm when a client finishes
453 //printf("RenderFarmServerThread::run 20\n");
461 int RenderFarmServerThread::write_string(char *string)
466 len = strlen(string) + 1;
467 datagram = new char[len + 4];
469 memcpy(datagram + i, string, len);
470 write_socket((char*)datagram, len + 4);
471 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
472 // datagram[0], datagram[1], datagram[2], datagram[3]);
478 void RenderFarmServerThread::send_preferences()
483 server->preferences->save_defaults(&defaults);
484 defaults.save_string(string);
485 write_string(string);
490 void RenderFarmServerThread::send_asset()
495 // The asset must be sent in two segments.
496 // One segment is stored in the EDL and contains decoding information.
497 // One segment is stored in the asset and contains encoding information.
498 server->default_asset->save_defaults(&defaults,
505 defaults.save_string(string1);
508 server->default_asset->write(&file, 0, 0);
509 file.terminate_string();
511 write_string(string1);
512 write_string(file.string);
517 void RenderFarmServerThread::send_edl()
522 server->edl->save_xml(plugindb,
527 file.terminate_string();
528 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
530 write_string(file.string);
531 //printf("RenderFarmServerThread::send_edl 2\n");
535 void RenderFarmServerThread::send_package(unsigned char *buffer)
537 this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
538 (((u_int32_t)buffer[1]) << 16) |
539 (((u_int32_t)buffer[2]) << 8) |
540 ((u_int32_t)buffer[3])) /
543 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
544 RenderPackage *package =
545 server->packages->get_package(frames_per_second,
547 server->use_local_rate);
549 //printf("RenderFarmServerThread::send_package 2\n");
550 datagram = new char[BCTEXTLEN];
555 //printf("RenderFarmServerThread::send_package 1\n");
556 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
557 write_socket(datagram, 4);
562 //printf("RenderFarmServerThread::send_package 10\n");
564 strcpy(&datagram[i], package->path);
565 i += strlen(package->path);
568 STORE_INT32(package->audio_start);
569 STORE_INT32(package->audio_end);
570 STORE_INT32(package->video_start);
571 STORE_INT32(package->video_end);
572 int use_brender = (server->brender ? 1 : 0);
573 STORE_INT32(use_brender);
574 STORE_INT32(package->audio_do);
575 STORE_INT32(package->video_do);
579 STORE_INT32(len - 4);
581 write_socket(datagram, len);
588 void RenderFarmServerThread::set_progress(unsigned char *buffer)
590 server->total_return_lock->lock("RenderFarmServerThread::set_progress");
591 *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
592 (((u_int32_t)buffer[1]) << 16) |
593 (((u_int32_t)buffer[2]) << 8) |
594 ((u_int32_t)buffer[3]);
595 server->total_return_lock->unlock();
598 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
602 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
603 (((u_int32_t)buffer[1]) << 16) |
604 (((u_int32_t)buffer[2]) << 8) |
605 ((u_int32_t)buffer[3]),
606 (int64_t)(((u_int32_t)buffer[4]) << 24) |
607 (((u_int32_t)buffer[5]) << 16) |
608 (((u_int32_t)buffer[6]) << 8) |
609 ((u_int32_t)buffer[7]));
610 char return_value[1];
612 write_socket(return_value, 1);
619 void RenderFarmServerThread::set_result(unsigned char *buffer)
621 //printf("RenderFarmServerThread::set_result %p\n", buffer);
622 if(!*server->result_return)
623 *server->result_return = buffer[0];
627 void RenderFarmServerThread::get_result()
629 unsigned char data[1];
630 data[0] = *server->result_return;
631 write_socket((char*)data, 1);
647 RenderFarmWatchdog::RenderFarmWatchdog(
648 RenderFarmServerThread *server,
649 RenderFarmClientThread *client)
652 this->server = server;
653 this->client = client;
654 next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
655 request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
659 RenderFarmWatchdog::~RenderFarmWatchdog()
662 next_request->unlock();
663 request_complete->unlock();
666 delete request_complete;
669 void RenderFarmWatchdog::begin_request()
671 next_request->unlock();
674 void RenderFarmWatchdog::end_request()
676 request_complete->unlock();
679 void RenderFarmWatchdog::run()
683 next_request->lock("RenderFarmWatchdog::run");
685 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000,
686 "RenderFarmWatchdog::run");
692 printf("RenderFarmWatchdog::run 1 killing pid %d\n", client->pid);
694 kill(client->pid, SIGKILL);
699 printf("RenderFarmWatchdog::run 1 killing thread %p\n", server);
701 unsigned char buffer[4];
703 server->set_result(buffer);