r870: Merge 2.1:
[cinelerra_cv.git] / cinelerra / renderfarm.C
blobb8cc782bec229bf6b841a0b85ab4972b734a8049
1 #include "asset.h"
2 #include "brender.h"
3 #include "clip.h"
4 #include "condition.h"
5 #include "bchash.h"
6 #include "edl.h"
7 #include "filesystem.h"
8 #include "filexml.h"
9 #include "language.h"
10 #include "mutex.h"
11 #include "packagedispatcher.h"
12 #include "preferences.h"
13 #include "render.h"
14 #include "renderfarm.h"
15 #include "renderfarmclient.h"
16 #include "bctimer.h"
17 #include "transportque.h"
20 #include <arpa/inet.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <netdb.h>
24 #include <netinet/in.h>
25 #include <signal.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <sys/un.h>
31 #include <unistd.h>
36 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb, 
37         PackageDispatcher *packages,
38         Preferences *preferences,
39         int use_local_rate,
40         int *result_return,
41         int64_t *total_return,
42         Mutex *total_return_lock,
43         Asset *default_asset,
44         EDL *edl,
45         BRender *brender)
47         this->plugindb = plugindb;
48         this->packages = packages;
49         this->preferences = preferences;
50         this->use_local_rate = use_local_rate;
51         this->result_return = result_return;
52         this->total_return = total_return;
53         this->total_return_lock = total_return_lock;
54         this->default_asset = default_asset;
55         this->edl = edl;
56         this->brender = brender;
57         client_lock = new Mutex("RenderFarmServer::client_lock");
60 RenderFarmServer::~RenderFarmServer()
62         clients.remove_all_objects();
63         delete client_lock;
66 // Open connections to clients.
67 int RenderFarmServer::start_clients()
69         int result = 0;
71         for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
72         {
73                 client_lock->lock("RenderFarmServer::start_clients");
74                 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb, 
75                         this, 
76                         i);
77                 clients.append(client);
79                 result = client->start_loop();
80                 client_lock->unlock();
81         }
83         return result;
86 // The render farm must wait for all the clients to finish.
87 int RenderFarmServer::wait_clients()
89 //printf("RenderFarmServer::wait_clients 1\n");
90         clients.remove_all_objects();
91 //printf("RenderFarmServer::wait_clients 2\n");
92         return 0;
106 // Waits for requests from every client.
107 // Joins when the client is finished.
108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb, 
109         RenderFarmServer *server, 
110         int number)
111  : Thread()
113         this->plugindb = plugindb;
114         this->server = server;
115         this->number = number;
116         socket_fd = -1;
117         frames_per_second = 0;
118         watchdog = 0;
119         buffer = 0;
120         datagram = 0;
121         Thread::set_synchronous(1);
126 RenderFarmServerThread::~RenderFarmServerThread()
128 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
129         Thread::join();
130 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
131         if(socket_fd >= 0) close(socket_fd);
132         if(watchdog) delete watchdog;
133         if(buffer) delete [] buffer;
134         if(datagram) delete [] datagram;
135 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
139 int RenderFarmServerThread::open_client(char *hostname, int port)
141         int socket_fd = -1;
142         int result = 0;
144 // Open file for master node
145         if(hostname[0] == '/')
146         {
147                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
148                 {
149                         perror(_("RenderFarmServerThread::start_loop: socket\n"));
150                         result = 1;
151                 }
152                 else
153                 {
154                         struct sockaddr_un addr;
155                         addr.sun_family = AF_FILE;
156                         strcpy(addr.sun_path, hostname);
157                         int size = (offsetof(struct sockaddr_un, sun_path) + 
158                                 strlen(hostname) + 1);
160 // The master node is always created by BRender.  Keep trying for 30 seconds.
162 #define ATTEMPT_DELAY 100000
163                         int done = 0;
164                         int attempt = 0;
166                         do
167                         {
168                                 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
169                                 {
170                                         attempt++;
171                                         if(attempt > 30000000 / ATTEMPT_DELAY)
172                                         {
173                                                 fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"), 
174                                                         hostname, 
175                                                         strerror(errno));
176                                                 result = 1;
177                                         }
178                                         else
179                                                 usleep(ATTEMPT_DELAY);
180                                 }
181                                 else
182                                         done = 1;
183                         }while(!result && !done);
184                 }
185         }
186         else
187 // Open socket
188         {
189                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
190                 {
191                         perror(_("RenderFarmServerThread::start_loop: socket"));
192                         result = 1;
193                 }
194                 else
195                 {
196 // Open port
197                         struct sockaddr_in addr;
198                         struct hostent *hostinfo;
199                         addr.sin_family = AF_INET;
200                         addr.sin_port = htons(port);
201                         hostinfo = gethostbyname(hostname);
202                         if(hostinfo == NULL)
203                 {
204                         fprintf(stderr, _("RenderFarmServerThread::open_client: unknown host %s.\n"), 
205                                         hostname);
206                         result = 1;
207                 }
208                         else
209                         {
210                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;   
212                                 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
213                                 {
214                                         fprintf(stderr, _("RenderFarmServerThread::open_client: %s: %s\n"), 
215                                                 hostname, 
216                                                 strerror(errno));
217                                         result = 1;
218                                 }
219                         }
220                 }
221         }
223         if(result) socket_fd = -1;
225         return socket_fd;
228 int RenderFarmServerThread::start_loop()
230         int result = 0;
232         socket_fd = open_client(server->preferences->get_node_hostname(number), 
233                 server->preferences->get_node_port(number));
235         if(socket_fd < 0) result = 1;
237         if(!result)
238         {
239                 watchdog = new RenderFarmWatchdog(this, 0);
240                 watchdog->start();
241         }
243         if(!result) Thread::start();
245         return result;
258 int64_t RenderFarmServerThread::read_int64(int *error)
260         int temp = 0;
261         if(!error) error = &temp;
263         unsigned char data[sizeof(int64_t)];
264         *error = (read_socket((char*)data, sizeof(int64_t)) != 
265                 sizeof(int64_t));
267 // Make it return 1 if error so it can be used to read a result code from the
268 // server.
269         int64_t result = 1;
270         if(!*error)
271         {
272                 result = (((int64_t)data[0]) << 56) |
273                         (((uint64_t)data[1]) << 48) | 
274                         (((uint64_t)data[2]) << 40) |
275                         (((uint64_t)data[3]) << 32) |
276                         (((uint64_t)data[4]) << 24) |
277                         (((uint64_t)data[5]) << 16) |
278                         (((uint64_t)data[6]) << 8)  |
279                         data[7];
280         }
281         return result;
284 int RenderFarmServerThread::write_int64(int64_t value)
286         unsigned char data[sizeof(int64_t)];
287         data[0] = (value >> 56) & 0xff;
288         data[1] = (value >> 48) & 0xff;
289         data[2] = (value >> 40) & 0xff;
290         data[3] = (value >> 32) & 0xff;
291         data[4] = (value >> 24) & 0xff;
292         data[5] = (value >> 16) & 0xff;
293         data[6] = (value >> 8) & 0xff;
294         data[7] = value & 0xff;
295         return (write_socket((char*)data, sizeof(int64_t)) !=
296                 sizeof(int64_t));
301 int RenderFarmServerThread::read_socket(char *data, int len)
303         int bytes_read = 0;
304         int offset = 0;
305         watchdog->begin_request();
306         while(len > 0 && bytes_read >= 0)
307         {
308                 enable_cancel();
309                 bytes_read = read(socket_fd, data + offset, len);
310                 disable_cancel();
311                 if(bytes_read > 0)
312                 {
313                         len -= bytes_read;
314                         offset += bytes_read;
315                 }
316                 else
317                 if(bytes_read < 0)
318                         break;
319         }
320         watchdog->end_request();
322         return offset;
325 int RenderFarmServerThread::write_socket(char *data, int len)
327         return write(socket_fd, data, len);
330 void RenderFarmServerThread::reallocate_buffer(int size)
332         if(buffer && buffer_allocated < size)
333         {
334                 delete [] buffer;
335                 buffer = 0;
336         }
338         if(!buffer && size)
339         {
340                 buffer = new unsigned char[size];
341                 buffer_allocated = size;
342         }
345 void RenderFarmServerThread::run()
347 // Wait for requests
348         unsigned char header[5];
349         int done = 0;
350         int bytes_read = 0;
353         buffer = 0;
354         buffer_allocated = 0;
355 //      fs_server = new RenderFarmFSServer(this);
356 //      fs_server->initialize();
360 // Send command to run package renderer.
361         write_int64(RENDERFARM_PACKAGES);
365         while(!done)
366         {
368 // Wait for requests.
369 // Requests consist of request ID's and accompanying buffers.
370 // Get request ID.
371                 bytes_read = read_socket((char*)header, 5);
372 //printf("RenderFarmServerThread::run 1\n");
373                 if(bytes_read != 5)
374                 {
375                         done = 1;
376                         continue;
377                 }
379                 int request_id = header[0];
380                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
381                                                         (((u_int32_t)header[2]) << 16) |
382                                                         (((u_int32_t)header[3]) << 8)  |
383                                                         (u_int32_t)header[4];
385                 reallocate_buffer(request_size);
387 // Get accompanying buffer
388                 bytes_read = read_socket((char*)buffer, request_size);
389 //printf("RenderFarmServerThread::run 2 %d %lld %d\n", request_id, request_size, bytes_read);
390                 if(bytes_read != request_size)
391                 {
392                         done = 1;
393                         continue;
394                 }
395 //printf("RenderFarmServerThread::run 3\n");
397                 switch(request_id)
398                 {
399                         case RENDERFARM_PREFERENCES:
400                                 send_preferences();
401                                 break;
402                         
403                         case RENDERFARM_ASSET:
404                                 send_asset();
405                                 break;
406                         
407                         case RENDERFARM_EDL:
408                                 send_edl();
409                                 break;
410                         
411                         case RENDERFARM_PACKAGE:
412                                 send_package(buffer);
413                                 break;
414                         
415                         case RENDERFARM_PROGRESS:
416                                 set_progress(buffer);
417                                 break;
419                         case RENDERFARM_SET_RESULT:
420                                 set_result(buffer);
421                                 break;
423                         case RENDERFARM_SET_VMAP:
424                                 set_video_map(buffer);
425                                 break;
427                         case RENDERFARM_GET_RESULT:
428                                 get_result();
429                                 break;
431                         case RENDERFARM_DONE:
432 //printf("RenderFarmServerThread::run 10\n");
433                                 done = 1;
434                                 break;
436                         case RENDERFARM_KEEPALIVE:
437                                 break;
439                         default:
440 //                              if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
441                                 {
442                                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
443                                 }
444                                 break;
445                 }
446 //printf("RenderFarmServerThread::run 10 %d %lld\n", request_id, request_size);
447         }
449 // Don't let watchdog kill the entire renderfarm when a client finishes
450 // normally.
451         if(watchdog) 
452         {
453 //printf("RenderFarmServerThread::run 20\n");
454                 delete watchdog;
455                 watchdog = 0;
456         }
458 //      delete fs_server;
461 int RenderFarmServerThread::write_string(char *string)
463         int i, len;
464         i = 0;
466         len = strlen(string) + 1;
467         datagram = new char[len + 4];
468         STORE_INT32(len);
469         memcpy(datagram + i, string, len);
470         write_socket((char*)datagram, len + 4);
471 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
472 //      datagram[0], datagram[1], datagram[2], datagram[3]);
474         delete [] datagram;
475         datagram = 0;
478 void RenderFarmServerThread::send_preferences()
480         BC_Hash defaults;
481         char *string;
483         server->preferences->save_defaults(&defaults);
484         defaults.save_string(string);
485         write_string(string);
487         delete [] string;
490 void RenderFarmServerThread::send_asset()
492         BC_Hash defaults;
493         char *string1;
495 // The asset must be sent in two segments.
496 // One segment is stored in the EDL and contains decoding information.
497 // One segment is stored in the asset and contains encoding information.
498         server->default_asset->save_defaults(&defaults, 
499                 0, 
500                 1,
501                 1,
502                 1,
503                 1,
504                 1);
505         defaults.save_string(string1);
507         FileXML file;
508         server->default_asset->write(&file, 0, 0);
509         file.terminate_string();
511         write_string(string1);
512         write_string(file.string);
513         delete [] string1;
517 void RenderFarmServerThread::send_edl()
519         FileXML file;
521 // Save the XML
522         server->edl->save_xml(plugindb,
523                 &file, 
524                 0,
525                 0,
526                 0);
527         file.terminate_string();
528 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
530         write_string(file.string);
531 //printf("RenderFarmServerThread::send_edl 2\n");
535 void RenderFarmServerThread::send_package(unsigned char *buffer)
537         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
538                 (((u_int32_t)buffer[1]) << 16) |
539                 (((u_int32_t)buffer[2]) << 8)  |
540                 ((u_int32_t)buffer[3])) / 
541                 65536.0;
543 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
544         RenderPackage *package = 
545                 server->packages->get_package(frames_per_second, 
546                         number, 
547                         server->use_local_rate);
549 //printf("RenderFarmServerThread::send_package 2\n");
550         datagram = new char[BCTEXTLEN];
552 // No more packages
553         if(!package)
554         {
555 //printf("RenderFarmServerThread::send_package 1\n");
556                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
557                 write_socket(datagram, 4);
558         }
559         else
560 // Encode package
561         {
562 //printf("RenderFarmServerThread::send_package 10\n");
563                 int i = 4;
564                 strcpy(&datagram[i], package->path);
565                 i += strlen(package->path);
566                 datagram[i++] = 0;
568                 STORE_INT32(package->audio_start);
569                 STORE_INT32(package->audio_end);
570                 STORE_INT32(package->video_start);
571                 STORE_INT32(package->video_end);
572                 int use_brender = (server->brender ? 1 : 0);
573                 STORE_INT32(use_brender);
574                 STORE_INT32(package->audio_do);
575                 STORE_INT32(package->video_do);
577                 int len = i;
578                 i = 0;
579                 STORE_INT32(len - 4);
581                 write_socket(datagram, len);
582         }
583         delete [] datagram;
584         datagram = 0;
588 void RenderFarmServerThread::set_progress(unsigned char *buffer)
590         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
591         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
592                                                                                         (((u_int32_t)buffer[1]) << 16) |
593                                                                                         (((u_int32_t)buffer[2]) << 8)  |
594                                                                                         ((u_int32_t)buffer[3]);
595         server->total_return_lock->unlock();
598 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
600         if(server->brender)
601         {
602                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
603                                                         (((u_int32_t)buffer[1]) << 16) |
604                                                         (((u_int32_t)buffer[2]) << 8)  |
605                                                         ((u_int32_t)buffer[3]),
606                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
607                                                         (((u_int32_t)buffer[5]) << 16) |
608                                                         (((u_int32_t)buffer[6]) << 8)  |
609                                                         ((u_int32_t)buffer[7]));
610                 char return_value[1];
611                 return_value[0] = 0;
612                 write_socket(return_value, 1);
613                 return 0;
614         }
615         return 1;
619 void RenderFarmServerThread::set_result(unsigned char *buffer)
621 //printf("RenderFarmServerThread::set_result %p\n", buffer);
622         if(!*server->result_return)
623                 *server->result_return = buffer[0];
627 void RenderFarmServerThread::get_result()
629         unsigned char data[1];
630         data[0] = *server->result_return;
631         write_socket((char*)data, 1);
647 RenderFarmWatchdog::RenderFarmWatchdog(
648         RenderFarmServerThread *server,
649         RenderFarmClientThread *client)
650  : Thread(1, 0, 0)
652         this->server = server;
653         this->client = client;
654         next_request = new Condition(0, "RenderFarmWatchdog::next_request", 0);
655         request_complete = new Condition(0, "RenderFarmWatchdog::request_complete", 0);
656         done = 0;
659 RenderFarmWatchdog::~RenderFarmWatchdog()
661         done = 1;
662         next_request->unlock();
663         request_complete->unlock();
664         join();
665         delete next_request;
666         delete request_complete;
669 void RenderFarmWatchdog::begin_request()
671         next_request->unlock();
674 void RenderFarmWatchdog::end_request()
676         request_complete->unlock();
679 void RenderFarmWatchdog::run()
681         while(!done)
682         {
683                 next_request->lock("RenderFarmWatchdog::run");
685                 int result = request_complete->timed_lock(RENDERFARM_TIMEOUT * 1000000, 
686                         "RenderFarmWatchdog::run");
688                 if(result)
689                 {
690                         if(client)
691                         {
692                                 printf("RenderFarmWatchdog::run 1 killing pid %d\n", client->pid);
693 //                              client->cancel();
694                                 kill(client->pid, SIGKILL);
695                         }
696                         else
697                         if(server)
698                         {
699                                 printf("RenderFarmWatchdog::run 1 killing thread %p\n", server);
700                                 server->cancel();
701                                 unsigned char buffer[4];
702                                 buffer[0] = 1;
703                                 server->set_result(buffer);
704                         }
706                         done = 1;
707                 }
708         }