From 80f105a16fe27df9901118bbdc212dc127d7f3a3 Mon Sep 17 00:00:00 2001 From: Johannes Sixt Date: Wed, 6 Sep 2006 19:59:41 +0000 Subject: [PATCH] r867: Merge 2.1: Big renderfarm update. This revision should match GIT::d325701ef2ca74f1885b2ba2ccf24ae1d1c295ed. --- cinelerra/channelpicker.C | 12 +- cinelerra/recordvideo.C | 28 ++--- cinelerra/renderfarm.C | 276 ++++++++++++++++++++++++++++------------- cinelerra/renderfarm.h | 85 ++++++++++++- cinelerra/renderfarm.inc | 3 +- cinelerra/renderfarmclient.C | 221 ++++++++++++++++++++------------- cinelerra/renderfarmclient.h | 43 +++++-- cinelerra/renderfarmclient.inc | 1 + cinelerra/tunerserver.C | 2 +- 9 files changed, 460 insertions(+), 211 deletions(-) diff --git a/cinelerra/channelpicker.C b/cinelerra/channelpicker.C index e33ac83b..5ef806fa 100644 --- a/cinelerra/channelpicker.C +++ b/cinelerra/channelpicker.C @@ -21,12 +21,12 @@ #include "videodevice.h" RecordChannelPicker::RecordChannelPicker(MWindow *mwindow, - Record *record, - RecordMonitor *record_monitor, + Record *record, + RecordMonitor *record_monitor, BC_WindowBase *parent_window, - ChannelDB *channeldb, - int x, - int y) + ChannelDB *channeldb, + int x, + int y) : ChannelPicker(channeldb, x, y, @@ -517,7 +517,7 @@ ChannelButton::ChannelButton(ChannelPicker *channel_picker, int x, int y) : BC_Button(x, - y, + y, channel_picker->get_theme() ? channel_picker->get_theme()->get_image_set("channel") : 0) diff --git a/cinelerra/recordvideo.C b/cinelerra/recordvideo.C index 8eae81e0..33d37ea7 100644 --- a/cinelerra/recordvideo.C +++ b/cinelerra/recordvideo.C @@ -134,22 +134,22 @@ int RecordVideo::cleanup_recording() void RecordVideo::get_capture_frame() { if(!capture_frame) -{ - if(record->fixed_compression) { - capture_frame = new VFrame; - } - else - { - capture_frame = new VFrame(0, - record->default_asset->width, - record->default_asset->height, - record->vdevice->get_best_colormodel(record->default_asset)); + if(record->fixed_compression) + { + capture_frame = new VFrame; + } + else + { + capture_frame = new VFrame(0, + record->default_asset->width, + record->default_asset->height, + record->vdevice->get_best_colormodel(record->default_asset)); //printf("RecordVideo::get_capture_frame %d %d\n", capture_frame->get_w(), capture_frame->get_h()); - } - frame_ptr = new VFrame**[1]; - frame_ptr[0] = new VFrame*[1]; - frame_ptr[0][0] = capture_frame; + } + frame_ptr = new VFrame**[1]; + frame_ptr[0] = new VFrame*[1]; + frame_ptr[0][0] = capture_frame; } } diff --git a/cinelerra/renderfarm.C b/cinelerra/renderfarm.C index 9c5d03b3..b8cc782b 100644 --- a/cinelerra/renderfarm.C +++ b/cinelerra/renderfarm.C @@ -1,6 +1,7 @@ #include "asset.h" #include "brender.h" #include "clip.h" +#include "condition.h" #include "bchash.h" #include "edl.h" #include "filesystem.h" @@ -11,7 +12,7 @@ #include "preferences.h" #include "render.h" #include "renderfarm.h" -//#include "renderfarmfsserver.h" +#include "renderfarmclient.h" #include "bctimer.h" #include "transportque.h" @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -113,6 +115,9 @@ RenderFarmServerThread::RenderFarmServerThread(ArrayList *plugind this->number = number; socket_fd = -1; frames_per_second = 0; + watchdog = 0; + buffer = 0; + datagram = 0; Thread::set_synchronous(1); } @@ -124,6 +129,9 @@ RenderFarmServerThread::~RenderFarmServerThread() Thread::join(); //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n"); if(socket_fd >= 0) close(socket_fd); + if(watchdog) delete watchdog; + if(buffer) delete [] buffer; + if(datagram) delete [] datagram; //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n"); } @@ -226,6 +234,11 @@ int RenderFarmServerThread::start_loop() if(socket_fd < 0) result = 1; + if(!result) + { + watchdog = new RenderFarmWatchdog(this, 0); + watchdog->start(); + } if(!result) Thread::start(); @@ -242,94 +255,78 @@ int RenderFarmServerThread::start_loop() -int RenderFarmServerThread::read_socket(int socket_fd, char *data, int len, int timeout) +int64_t RenderFarmServerThread::read_int64(int *error) +{ + int temp = 0; + if(!error) error = &temp; + + unsigned char data[sizeof(int64_t)]; + *error = (read_socket((char*)data, sizeof(int64_t)) != + sizeof(int64_t)); + +// Make it return 1 if error so it can be used to read a result code from the +// server. + int64_t result = 1; + if(!*error) + { + result = (((int64_t)data[0]) << 56) | + (((uint64_t)data[1]) << 48) | + (((uint64_t)data[2]) << 40) | + (((uint64_t)data[3]) << 32) | + (((uint64_t)data[4]) << 24) | + (((uint64_t)data[5]) << 16) | + (((uint64_t)data[6]) << 8) | + data[7]; + } + return result; +} + +int RenderFarmServerThread::write_int64(int64_t value) +{ + unsigned char data[sizeof(int64_t)]; + data[0] = (value >> 56) & 0xff; + data[1] = (value >> 48) & 0xff; + data[2] = (value >> 40) & 0xff; + data[3] = (value >> 32) & 0xff; + data[4] = (value >> 24) & 0xff; + data[5] = (value >> 16) & 0xff; + data[6] = (value >> 8) & 0xff; + data[7] = value & 0xff; + return (write_socket((char*)data, sizeof(int64_t)) != + sizeof(int64_t)); +} + + + +int RenderFarmServerThread::read_socket(char *data, int len) { int bytes_read = 0; int offset = 0; + watchdog->begin_request(); while(len > 0 && bytes_read >= 0) { - int result = 0; - if(timeout > 0) + enable_cancel(); + bytes_read = read(socket_fd, data + offset, len); + disable_cancel(); + if(bytes_read > 0) { - fd_set read_fds; - struct timeval tv; - - FD_ZERO(&read_fds); - FD_SET(socket_fd, &read_fds); - tv.tv_sec = timeout; - tv.tv_usec = 0; - - result = select(socket_fd + 1, - &read_fds, - 0, - 0, - &tv); - FD_ZERO(&read_fds); - } - else - result = 1; - - if(result) - { - bytes_read = read(socket_fd, data + offset, len); - if(bytes_read > 0) - { - len -= bytes_read; - offset += bytes_read; - } - else - { - break; - } + len -= bytes_read; + offset += bytes_read; } else - { + if(bytes_read < 0) break; - } } + watchdog->end_request(); return offset; } -int RenderFarmServerThread::write_socket(int socket_fd, char *data, int len, int timeout) +int RenderFarmServerThread::write_socket(char *data, int len) { - int result = 0; - if(timeout > 0) - { - fd_set write_fds; - struct timeval tv; - FD_ZERO(&write_fds); - FD_SET(socket_fd, &write_fds); - tv.tv_sec = timeout; - tv.tv_usec = 0; -//printf("RenderFarmServerThread::write_socket 1\n"); - result = select(socket_fd + 1, - 0, - &write_fds, - 0, - &tv); -//printf("RenderFarmServerThread::write_socket 2\n"); - FD_ZERO(&write_fds); - if(!result) - { -printf("RenderFarmServerThread::write_socket 1 socket timed out. len=%d\n", len); - return 0; - } - } - return write(socket_fd, data, len); } -int RenderFarmServerThread::read_socket(char *data, int len, int timeout) -{ - return read_socket(socket_fd, data, len, timeout); -} - -int RenderFarmServerThread::write_socket(char *data, int len, int timeout) -{ - return write_socket(socket_fd, data, len, timeout); -} - void RenderFarmServerThread::reallocate_buffer(int size) { if(buffer && buffer_allocated < size) @@ -350,19 +347,30 @@ void RenderFarmServerThread::run() // Wait for requests unsigned char header[5]; int done = 0; + int bytes_read = 0; buffer = 0; buffer_allocated = 0; // fs_server = new RenderFarmFSServer(this); // fs_server->initialize(); + + + +// Send command to run package renderer. + write_int64(RENDERFARM_PACKAGES); + + + while(!done) { // Wait for requests. // Requests consist of request ID's and accompanying buffers. // Get request ID. - if(read_socket(socket_fd, (char*)header, 5, -1) != 5) + bytes_read = read_socket((char*)header, 5); +//printf("RenderFarmServerThread::run 1\n"); + if(bytes_read != 5) { done = 1; continue; @@ -377,7 +385,9 @@ void RenderFarmServerThread::run() reallocate_buffer(request_size); // Get accompanying buffer - if(read_socket((char*)buffer, request_size, RENDERFARM_TIMEOUT) != request_size) + bytes_read = read_socket((char*)buffer, request_size); +//printf("RenderFarmServerThread::run 2 %d %lld %d\n", request_id, request_size, bytes_read); + if(bytes_read != request_size) { done = 1; continue; @@ -423,6 +433,8 @@ void RenderFarmServerThread::run() done = 1; break; + case RENDERFARM_KEEPALIVE: + break; default: // if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer)) @@ -433,27 +445,34 @@ void RenderFarmServerThread::run() } //printf("RenderFarmServerThread::run 10 %d %lld\n", request_id, request_size); } - - if(buffer) delete [] buffer; + +// Don't let watchdog kill the entire renderfarm when a client finishes +// normally. + if(watchdog) + { //printf("RenderFarmServerThread::run 20\n"); + delete watchdog; + watchdog = 0; + } + // delete fs_server; } -int RenderFarmServerThread::write_string(int socket_fd, char *string) +int RenderFarmServerThread::write_string(char *string) { - unsigned char *datagram; int i, len; i = 0; len = strlen(string) + 1; - datagram = new unsigned char[len + 4]; + datagram = new char[len + 4]; STORE_INT32(len); memcpy(datagram + i, string, len); - write_socket(socket_fd, (char*)datagram, len + 4, RENDERFARM_TIMEOUT); + write_socket((char*)datagram, len + 4); //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n", // datagram[0], datagram[1], datagram[2], datagram[3]); delete [] datagram; + datagram = 0; } void RenderFarmServerThread::send_preferences() @@ -463,7 +482,7 @@ void RenderFarmServerThread::send_preferences() server->preferences->save_defaults(&defaults); defaults.save_string(string); - write_string(socket_fd, string); + write_string(string); delete [] string; } @@ -489,8 +508,8 @@ void RenderFarmServerThread::send_asset() server->default_asset->write(&file, 0, 0); file.terminate_string(); - write_string(socket_fd, string1); - write_string(socket_fd, file.string); + write_string(string1); + write_string(file.string); delete [] string1; } @@ -508,7 +527,7 @@ void RenderFarmServerThread::send_edl() file.terminate_string(); //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string); - write_string(socket_fd, file.string); + write_string(file.string); //printf("RenderFarmServerThread::send_edl 2\n"); } @@ -528,14 +547,14 @@ void RenderFarmServerThread::send_package(unsigned char *buffer) server->use_local_rate); //printf("RenderFarmServerThread::send_package 2\n"); - char datagram[BCTEXTLEN]; + datagram = new char[BCTEXTLEN]; // No more packages if(!package) { //printf("RenderFarmServerThread::send_package 1\n"); datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0; - write_socket(datagram, 4, RENDERFARM_TIMEOUT); + write_socket(datagram, 4); } else // Encode package @@ -559,8 +578,10 @@ void RenderFarmServerThread::send_package(unsigned char *buffer) i = 0; STORE_INT32(len - 4); - write_socket(datagram, len, RENDERFARM_TIMEOUT); + write_socket(datagram, len); } + delete [] datagram; + datagram = 0; } @@ -588,7 +609,7 @@ int RenderFarmServerThread::set_video_map(unsigned char *buffer) ((u_int32_t)buffer[7])); char return_value[1]; return_value[0] = 0; - write_socket(return_value, 1, RENDERFARM_TIMEOUT); + write_socket(return_value, 1); return 0; } return 1; @@ -607,7 +628,86 @@ void RenderFarmServerThread::get_result() { unsigned char data[1]; data[0] = *server->result_return; - write_socket((char*)data, 1, RENDERFARM_TIMEOUT); + write_socket((char*)data, 1); +} + + + + + + + + + + + + + + +RenderFarmWatchdog::RenderFarmWatchdog( + RenderFarmServerThread *server, + RenderFarmClientThread *client) + : Thread(1, 0, 0) +{ + this->server = server; + this->client = client; + next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0); + request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0); + done = 0; +} + +RenderFarmWatchdog::~RenderFarmWatchdog() +{ + done = 1; + next_request->unlock(); + request_complete->unlock(); + join(); + delete next_request; + delete request_complete; } +void RenderFarmWatchdog::begin_request() +{ + next_request->unlock(); +} + +void RenderFarmWatchdog::end_request() +{ + request_complete->unlock(); +} + +void RenderFarmWatchdog::run() +{ + while(!done) + { + next_request->lock("RenderFarmWatchdog::run"); + + int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000, + "RenderFarmWatchdog::run"); + + if(result) + { + if(client) + { + printf("RenderFarmWatchdog::run 1 killing pid %d\n", client->pid); +// client->cancel(); + kill(client->pid, SIGKILL); + } + else + if(server) + { + printf("RenderFarmWatchdog::run 1 killing thread %p\n", server); + server->cancel(); + unsigned char buffer[4]; + buffer[0] = 1; + server->set_result(buffer); + } + + done = 1; + } + } +} + + + diff --git a/cinelerra/renderfarm.h b/cinelerra/renderfarm.h index 187ee848..405283bb 100644 --- a/cinelerra/renderfarm.h +++ b/cinelerra/renderfarm.h @@ -6,6 +6,7 @@ #include "asset.inc" #include "brender.inc" #include "bchash.inc" +#include "condition.inc" #include "edl.inc" #include "mutex.inc" #include "mwindow.inc" @@ -20,6 +21,43 @@ #include + +// Renderfarm theory: +// The renderfarm starts a RenderFarmServerThread for each client +// listed in the preferences. +// The RenderFarmServerThread starts a RenderFarmWatchdog thread. +// write_socket and read_socket start the watchdog thread. If they don't +// return in a certain time, the watchdog thread assumes the client has crashed +// and kills RenderFarmServerThread. +// RenderFarmServerThread handles requests from the client once the +// connection is open. All the RenderFarmServerThread's are joined by the +// RenderFarmServer when the jobs are finished. +// +// On the client side, the process started by the user is a RenderFarmClient. +// It waits for connections from the server and starts a RenderFarmClientThread +// for each connection. RenderFarmClientThread is a thread but it in turn +// starts a fork for the actual rendering. A fork instead of a thread is +// used to avoid reentrancy problems with the +// codecs, but we still need a thread to join the process. +// +// The fork requests jobs from the server until the job table is empty +// or the server reports an error. This fork must poll the server +// after every frame for the error status. Also the fork creates a +// RenderFarmWatchdog thread to kill itself if a write_socket or read_socket +// doesn't return. +// +// RenderFarmClientThread detaches when finished. +// It doesn't account for the server command loop, which waits for read_socket +// indefinitely. This needs to be pinged periodically to keep the read_socket +// alive. +// +// Once, it tried to use a virtual file system to allow rendering clients without +// mounting the filesystem of the server. This proved impractical because of +// the many odd schemes used by file libraries. Abstracting "open" didn't +// work. Read ahead and caching were required to get decent performance. +// +// Whether it cleans up when timed out is unknown. + // Request format // 1 byte -> request code // 4 bytes -> size of packet exclusive @@ -67,6 +105,7 @@ // Request codes to be used in both client and server. enum { + RENDERFARM_NONE, RENDERFARM_PREFERENCES, // 0 Get preferences on startup RENDERFARM_ASSET, // Get output format on startup RENDERFARM_EDL, // Get EDL on startup @@ -76,8 +115,10 @@ enum RENDERFARM_GET_RESULT, // Retrieve error status RENDERFARM_DONE, // Quit RENDERFARM_SET_VMAP, // 8 Update video map in background rendering + RENDERFARM_COMMAND, // Get the client to run RENDERFARM_TUNER, // Run a tuner server RENDERFARM_PACKAGES, // Run packages + RENDERFARM_KEEPALIVE, // Keep alive // VFS commands RENDERFARM_FOPEN, @@ -130,7 +171,12 @@ public: Preferences *preferences; // Use master node's framerate int use_local_rate; +// These values are shared between the local renderer and the +// renderfarm server threads. +// The error code. +// Any nonzero value is an error and stops rendering. int *result_return; +// The total number of frames completed int64_t *total_return; Mutex *total_return_lock; Asset *default_asset; @@ -148,16 +194,19 @@ public: int number); ~RenderFarmServerThread(); - static int read_socket(int socket_fd, char *data, int len, int timeout); - static int write_socket(int socket_fd, char *data, int len, int timeout); + +// Used by both client and server + int write_int64(int64_t value); + int64_t read_int64(int *error); // Inserts header and writes string to socket - static int write_string(int socket_fd, char *string); + int write_string(char *string); static int open_client(char *hostname, int port); - int read_socket(char *data, int len, int timeout); - int write_socket(char *data, int len, int timeout); +// Used by server only + int read_socket(char *data, int len); + int write_socket(char *data, int len); int start_loop(); void send_preferences(); void send_asset(); @@ -175,16 +224,42 @@ public: // MWindow *mwindow; ArrayList *plugindb; RenderFarmServer *server; + RenderFarmWatchdog *watchdog; int socket_fd; int number; // Rate of last job or 0 double frames_per_second; // Pointer to default asset Asset *default_asset; +// These objects can be left dangling of the watchdog kills the thread. +// They are deleted in the destructor. unsigned char *buffer; int64_t buffer_allocated; + char *datagram; }; +class RenderFarmWatchdog : public Thread +{ +public: +// use_pid - causes it to kill the pid instead of cancel the thread +// Used for client. + RenderFarmWatchdog(RenderFarmServerThread *server, + RenderFarmClientThread *client); + ~RenderFarmWatchdog(); + +// Called at the beginning of a socket read + void begin_request(); +// Called when a socket read succeeds + void end_request(); + void run(); + + RenderFarmServerThread *server; + RenderFarmClientThread *client; + Condition *next_request; + Condition *request_complete; + int done; + int pid; +}; diff --git a/cinelerra/renderfarm.inc b/cinelerra/renderfarm.inc index 00e810c5..dfbdaa6c 100644 --- a/cinelerra/renderfarm.inc +++ b/cinelerra/renderfarm.inc @@ -4,9 +4,10 @@ class RenderFarmServer; class RenderFarmServerThread; +class RenderFarmKeepalive; // Number of seconds before giving up on a socket. // Change if VFS is slow. -#define RENDERFARM_TIMEOUT 60 +#define RENDERFARM_TIMEOUT 15 #endif diff --git a/cinelerra/renderfarmclient.C b/cinelerra/renderfarmclient.C index 41d896bf..be81b838 100644 --- a/cinelerra/renderfarmclient.C +++ b/cinelerra/renderfarmclient.C @@ -47,7 +47,6 @@ RenderFarmClient::RenderFarmClient(int port, this_pid = getpid(); nice(nice_value); - thread = new RenderFarmClientThread(this); MWindow::init_defaults(boot_defaults, config_path); boot_preferences = new Preferences; @@ -60,7 +59,7 @@ RenderFarmClient::RenderFarmClient(int port, RenderFarmClient::~RenderFarmClient() { - delete thread; +// delete thread; delete boot_defaults; delete boot_preferences; plugindb->remove_all_objects(); @@ -155,6 +154,8 @@ void RenderFarmClient::main_loop() else { //printf("RenderFarmClient::main_loop: Session started from %s\n", inet_ntoa(clientname.sin_addr)); + RenderFarmClientThread *thread = + new RenderFarmClientThread(this); thread->main_loop(new_socket_fd); } } @@ -172,6 +173,8 @@ void RenderFarmClient::main_loop() else { //printf("RenderFarmClient::main_loop: Session started from %s\n", clientname.sun_path); + RenderFarmClientThread *thread = + new RenderFarmClientThread(this); thread->main_loop(new_socket_fd); } } @@ -204,19 +207,23 @@ printf("RenderFarmClient::kill_client 2\n"); // after every frame for the error status. // Detaches when finished. RenderFarmClientThread::RenderFarmClientThread(RenderFarmClient *client) - : Thread() + : Thread(0, 0, 1) { this->client = client; frames_per_second = 0; Thread::set_synchronous(0); // fs_client = 0; mutex_lock = new Mutex("RenderFarmClientThread::mutex_lock"); + watchdog = 0; + keep_alive = 0; } RenderFarmClientThread::~RenderFarmClientThread() { // if(fs_client) delete fs_client; delete mutex_lock; + delete watchdog; + delete keep_alive; } @@ -231,46 +238,36 @@ int RenderFarmClientThread::send_request_header(int request, // printf("RenderFarmClientThread::send_request_header %d %02x%02x%02x%02x%02x\n", // request, datagram[0], datagram[1], datagram[2], datagram[3], datagram[4]); - return (write_socket((char*)datagram, - 5, - RENDERFARM_TIMEOUT) != 5); + return (write_socket((char*)datagram, 5) != 5); } -int RenderFarmClientThread::write_socket(char *data, int len, int timeout) +int RenderFarmClientThread::write_socket(char *data, int len) { - int result = RenderFarmServerThread::write_socket(socket_fd, - data, - len, - timeout); -// Assume the stream is offset and give up future accesses. - if(result <= 0) abort(); + return write(socket_fd, data, len); } -int RenderFarmClientThread::read_socket(char *data, int len, int timeout) +int RenderFarmClientThread::read_socket(char *data, int len) { - int result = RenderFarmServerThread::read_socket(socket_fd, - data, - len, - timeout); -// Assume the stream is offset and give up future accesses. - if(result <= 0) abort(); -} - -void RenderFarmClientThread::abort() -{ - send_completion(socket_fd); - close(socket_fd); - exit(1); -} + int bytes_read = 0; + int offset = 0; + watchdog->begin_request(); + while(len > 0 && bytes_read >= 0) + { + bytes_read = read(socket_fd, data + offset, len); + if(bytes_read > 0) + { + len -= bytes_read; + offset += bytes_read; + } + else + if(bytes_read < 0) + { + break; + } + } + watchdog->end_request(); -void RenderFarmClientThread::lock(char *location) -{ - mutex_lock->lock(location); -} - -void RenderFarmClientThread::unlock() -{ - mutex_lock->unlock(); + return offset; } int RenderFarmClientThread::write_int64(int64_t value) @@ -284,7 +281,7 @@ int RenderFarmClientThread::write_int64(int64_t value) data[5] = (value >> 16) & 0xff; data[6] = (value >> 8) & 0xff; data[7] = value & 0xff; - return (write_socket((char*)data, sizeof(int64_t), RENDERFARM_TIMEOUT) != sizeof(int64_t)); + return (write_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t)); } int64_t RenderFarmClientThread::read_int64(int *error) @@ -293,7 +290,7 @@ int64_t RenderFarmClientThread::read_int64(int *error) if(!error) error = &temp; unsigned char data[sizeof(int64_t)]; - *error = (read_socket((char*)data, sizeof(int64_t), RENDERFARM_TIMEOUT) != sizeof(int64_t)); + *error = (read_socket((char*)data, sizeof(int64_t)) != sizeof(int64_t)); // Make it return 1 if error so it can be used to read a result code from the // server. @@ -315,7 +312,7 @@ int64_t RenderFarmClientThread::read_int64(int *error) void RenderFarmClientThread::read_string(char* &string) { unsigned char header[4]; - if(read_socket((char*)header, 4, RENDERFARM_TIMEOUT) != 4) + if(read_socket((char*)header, 4) != 4) { string = 0; return; @@ -329,7 +326,7 @@ void RenderFarmClientThread::read_string(char* &string) if(len) { string = new char[len]; - if(read_socket(string, len, RENDERFARM_TIMEOUT) != len) + if(read_socket(string, len) != len) { delete [] string; string = 0; @@ -340,6 +337,22 @@ void RenderFarmClientThread::read_string(char* &string) } +void RenderFarmClientThread::abort() +{ + send_completion(socket_fd); + close(socket_fd); + exit(1); +} + +void RenderFarmClientThread::lock(char *location) +{ + mutex_lock->lock(location); +} + +void RenderFarmClientThread::unlock() +{ + mutex_lock->unlock(); +} void RenderFarmClientThread::get_command(int socket_fd, int *command) { @@ -357,6 +370,7 @@ void RenderFarmClientThread::get_command(int socket_fd, int *command) void RenderFarmClientThread::read_preferences(int socket_fd, Preferences *preferences) { + lock("RenderFarmClientThread::read_preferences"); send_request_header(RENDERFARM_PREFERENCES, 0); @@ -368,12 +382,14 @@ void RenderFarmClientThread::read_preferences(int socket_fd, preferences->load_defaults(&defaults); delete [] string; + unlock(); } void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset) { + lock("RenderFarmClientThread::read_asset"); send_request_header(RENDERFARM_ASSET, 0); @@ -402,12 +418,14 @@ void RenderFarmClientThread::read_asset(int socket_fd, Asset *asset) delete [] string1; delete [] string2; + unlock(); } void RenderFarmClientThread::read_edl(int socket_fd, EDL *edl, Preferences *preferences) { + lock("RenderFarmClientThread::read_edl"); send_request_header(RENDERFARM_EDL, 0); @@ -431,40 +449,12 @@ void RenderFarmClientThread::read_edl(int socket_fd, LOAD_ALL); - -// Tag input paths for VFS here. -// Create VFS object. - FileSystem fs; -// if(preferences->renderfarm_vfs) -// { -// fs_client = new RenderFarmFSClient(this); -// fs_client->initialize(); -// -// for(Asset *asset = edl->assets->first; -// asset; -// asset = asset->next) -// { -// char string2[BCTEXTLEN]; -// strcpy(string2, asset->path); -// sprintf(asset->path, RENDERFARM_FS_PREFIX "%s", string2); -// } -// } - -// for(Asset *asset = edl->assets->first; -// asset; -// asset = asset->next) -// { -// char string2[BCTEXTLEN]; -// strcpy(string2, asset->path); -// fs.join_names(asset->path, preferences->renderfarm_mountpoint, string2); -// } - - -//edl->dump(); + unlock(); } int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package) { + lock("RenderFarmClientThread::read_package"); send_request_header(RENDERFARM_PACKAGE, 4); @@ -476,7 +466,7 @@ int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package) int64_t fixed = !EQUIV(frames_per_second, 0.0) ? (int64_t)(frames_per_second * 65536.0) : 0; STORE_INT32(fixed); - write_socket((char*)datagram, 4, RENDERFARM_TIMEOUT); + write_socket((char*)datagram, 4); //printf("RenderFarmClientThread::read_package 1 %f %ld\n", frames_per_second, fixed); @@ -488,6 +478,7 @@ int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package) if(!data) { // printf(_("RenderFarmClientThread::read_package no output path recieved.\n")); + unlock(); return 1; } @@ -513,30 +504,40 @@ int RenderFarmClientThread::read_package(int socket_fd, RenderPackage *package) package->video_do = READ_INT32(data_ptr); delete [] data; + unlock(); return 0; } int RenderFarmClientThread::send_completion(int socket_fd) { - return send_request_header(RENDERFARM_DONE, - 0); + lock("RenderFarmClientThread::send_completion"); + int result = send_request_header(RENDERFARM_DONE, 0); + unlock(); + return result; } +void RenderFarmClientThread::ping_server() +{ + lock("RenderFarmClientThread::ping_server"); + send_request_header(RENDERFARM_KEEPALIVE, 0); + unlock(); +} + void RenderFarmClientThread::main_loop(int socket_fd) { - this->socket_fd = socket_fd; + this->socket_fd = socket_fd; - Thread::start(); + Thread::start(); } void RenderFarmClientThread::run() { // Create new memory space - int pid = fork(); + pid = fork(); if(pid != 0) { int return_value; @@ -544,13 +545,14 @@ void RenderFarmClientThread::run() return; } - - +// Get the pid of the fork if inside the fork + pid = getpid(); int socket_fd = this->socket_fd; + init_client_keepalive(); // Get command to run int command; @@ -578,6 +580,13 @@ SET_TRACE } +void RenderFarmClientThread::init_client_keepalive() +{ + keep_alive = new RenderFarmKeepalive(this); + keep_alive->start(); + watchdog = new RenderFarmWatchdog(0, this); + watchdog->start(); +} @@ -687,8 +696,45 @@ void RenderFarmClientThread::do_packages(int socket_fd) //printf("RenderFarmClientThread::run 11\n"); delete preferences; printf(_("RenderFarmClientThread::run: Session finished.\n")); +} - _exit(0); + + + + + + + + +RenderFarmKeepalive::RenderFarmKeepalive( + RenderFarmClientThread *client_thread) + : Thread(1, 0, 0) +{ + this->client_thread = client_thread; + done = 0; +} + +RenderFarmKeepalive::~RenderFarmKeepalive() +{ + done = 1; + cancel(); + join(); +} + + +void RenderFarmKeepalive::run() +{ + while(!done) + { + enable_cancel(); + sleep(5); + disable_cancel(); + if(!done) + { +// watchdog thread kills this if it gets stuck + client_thread->ping_server(); + } + } } @@ -698,6 +744,13 @@ printf(_("RenderFarmClientThread::run: Session finished.\n")); + + + + + + + FarmPackageRenderer::FarmPackageRenderer(RenderFarmClientThread *thread, int socket_fd) : PackageRenderer() @@ -720,7 +773,7 @@ int FarmPackageRenderer::get_result() 0); unsigned char data[1]; data[0] = 1; - if(thread->read_socket((char*)data, 1, RENDERFARM_TIMEOUT) != 1) + if(thread->read_socket((char*)data, 1) != 1) { thread->unlock(); return 1; @@ -736,7 +789,7 @@ void FarmPackageRenderer::set_result(int value) 1); unsigned char data[1]; data[0] = value; - thread->write_socket((char*)data, 1, RENDERFARM_TIMEOUT); + thread->write_socket((char*)data, 1); thread->unlock(); } @@ -748,7 +801,7 @@ void FarmPackageRenderer::set_progress(int64_t total_samples) unsigned char datagram[4]; int i = 0; STORE_INT32(total_samples); - thread->write_socket((char*)datagram, 4, RENDERFARM_TIMEOUT); + thread->write_socket((char*)datagram, 4); thread->unlock(); } @@ -764,10 +817,10 @@ int FarmPackageRenderer::set_video_map(int64_t position, int value) 8); STORE_INT32(position); STORE_INT32(value); - thread->write_socket((char*)datagram, 8, RENDERFARM_TIMEOUT); + thread->write_socket((char*)datagram, 8); // Get completion since the GUI may be locked for a long time. - if(!thread->read_socket(return_value, 1, RENDERFARM_TIMEOUT)) + if(!thread->read_socket(return_value, 1)) { result = 1; } diff --git a/cinelerra/renderfarmclient.h b/cinelerra/renderfarmclient.h index ae9ad511..699c1233 100644 --- a/cinelerra/renderfarmclient.h +++ b/cinelerra/renderfarmclient.h @@ -9,6 +9,7 @@ #include "packagerenderer.h" #include "pluginserver.inc" #include "preferences.inc" +#include "renderfarm.inc" #include "renderfarmclient.inc" //#include "renderfarmfsclient.inc" #include "thread.h" @@ -30,7 +31,7 @@ public: // After a socket times out, kill the render node. void kill_client(); - RenderFarmClientThread *thread; +// RenderFarmClientThread *thread; int port; char *deamon_path; @@ -42,15 +43,7 @@ public: ArrayList *plugindb; }; -// When a connection is opened, the thread forks to handle the rendering session. -// A fork instead of a thread is used to avoid reentrancy problems with the -// codecs, but we still need a thread to join the process. -// -// The fork requests jobs from the server until the job table is empty -// or the server reports an error. This fork must poll the server -// after every frame for the error status. -// Detaches when finished. -class RenderFarmClientThread : Thread +class RenderFarmClientThread : public Thread { public: RenderFarmClientThread(RenderFarmClient *client); @@ -60,13 +53,16 @@ public: // The ID of the request followed by the size of the data that follows is sent. int send_request_header(int request, int len); - int write_socket(char *data, int len, int timeout); - int read_socket(char *data, int len, int timeout); +// These are local functions to handle errors the right way for a client. +// They simply call the RenderFarmServerThread functions and abort if error. + int write_socket(char *data, int len); + int read_socket(char *data, int len); // Return 1 if error int write_int64(int64_t number); int64_t read_int64(int *error = 0); void read_string(char* &string); void abort(); +// Lock access to the socket during complete transactions void lock(char *location); void unlock(); @@ -85,6 +81,8 @@ public: Preferences *preferences); int read_package(int socket_fd, RenderPackage *package); int send_completion(int socket_fd); + void ping_server(); + void init_client_keepalive(); void main_loop(int socket_fd); void run(); @@ -96,6 +94,10 @@ public: // RenderFarmFSClient *fs_client; double frames_per_second; Mutex *mutex_lock; + RenderFarmWatchdog *watchdog; + RenderFarmKeepalive *keep_alive; +// pid of forked process + int pid; }; @@ -129,5 +131,22 @@ public: +class RenderFarmKeepalive : public Thread +{ +public: + RenderFarmKeepalive(RenderFarmClientThread *client_thread); + ~RenderFarmKeepalive(); + + void run(); + + RenderFarmClientThread *client_thread; + int done; +}; + + + + + + #endif diff --git a/cinelerra/renderfarmclient.inc b/cinelerra/renderfarmclient.inc index 52ea789e..4853f8bb 100644 --- a/cinelerra/renderfarmclient.inc +++ b/cinelerra/renderfarmclient.inc @@ -5,5 +5,6 @@ class RenderFarmClient; class RenderFarmClientThread; class FarmPackageRenderer; +class RenderFarmWatchdog; #endif diff --git a/cinelerra/tunerserver.C b/cinelerra/tunerserver.C index 4e64a2a1..28d60c90 100644 --- a/cinelerra/tunerserver.C +++ b/cinelerra/tunerserver.C @@ -115,7 +115,7 @@ void TunerServer::main_loop() int bytes_read = read_data(temp, size); error = client->write_int64(bytes_read); if(!error) - error = client->write_socket((char*)temp, bytes_read, RENDERFARM_TIMEOUT); + error = client->write_socket((char*)temp, bytes_read); break; } -- 2.11.4.GIT