r663: This commit was generated by cvs2svn to compensate for changes in r662,
[cinelerra_cv.git] / cinelerra / loadbalance.C
blob66a6e99f5dccc502ce8163c680e998896cfa0d5c
1 #include "condition.h"
2 #include "mutex.h"
3 #include "loadbalance.h"
8 LoadPackage::LoadPackage()
10         completion_lock = new Condition(0, "LoadPackage::completion_lock");
12 LoadPackage::~LoadPackage()
14         delete completion_lock;
25 LoadClient::LoadClient(LoadServer *server)
26  : Thread()
28         Thread::set_synchronous(1);
29         this->server = server;
30         done = 0;
31         package_number = 0;
32         input_lock = new Condition(0, "LoadClient::input_lock");
33         completion_lock = new Condition(0, "LoadClient::completion_lock");
36 LoadClient::LoadClient()
37  : Thread()
39         Thread::set_synchronous(1);
40         server = 0;
41         done = 0;
42         package_number = 0;
43         input_lock = new Condition(0, "LoadClient::input_lock");
44         completion_lock = new Condition(0, "LoadClient::completion_lock");
47 LoadClient::~LoadClient()
49         done = 1;
50         input_lock->unlock();
51         Thread::join();
52         delete input_lock;
53         delete completion_lock;
56 int LoadClient::get_package_number()
58         return package_number;
61 LoadServer* LoadClient::get_server()
63         return server;
67 void LoadClient::run()
69         while(!done)
70         {
71                 input_lock->lock("LoadClient::run");
73                 if(!done)
74                 {
75 // Read packet
76                         LoadPackage *package;
77                         
78                         
79                         server->client_lock->lock("LoadClient::run");
80                         if(server->current_package < server->total_packages)
81                         {
82                                 package_number = server->current_package;
83                                 package = server->packages[server->current_package++];
84                                 server->client_lock->unlock();
85                                 input_lock->unlock();
87                                 process_package(package);
89                                 package->completion_lock->unlock();
90                         }
91                         else
92                         {
93                                 completion_lock->unlock();
94                                 server->client_lock->unlock();
95                         }
96                 }
97         }
107 LoadServer::LoadServer(int total_clients, int total_packages)
109         if(total_clients <= 0)
110                 printf("LoadServer::LoadServer total_clients == %d\n", total_clients);
111         this->total_clients = total_clients;
112         this->total_packages = total_packages;
113         current_package = 0;
114         clients = 0;
115         packages = 0;
116         client_lock = new Mutex("LoadServer::client_lock");
119 LoadServer::~LoadServer()
121         delete_clients();
122         delete_packages();
123         delete client_lock;
126 void LoadServer::delete_clients()
128         if(clients)
129         {
130                 for(int i = 0; i < total_clients; i++)
131                         delete clients[i];
132                 delete [] clients;
133         }
134         clients = 0;
137 void LoadServer::delete_packages()
139         if(packages)
140         {
141                 for(int i = 0; i < total_packages; i++)
142                         delete packages[i];
143                 delete [] packages;
144         }
145         packages = 0;
148 void LoadServer::set_package_count(int total_packages)
150         delete_packages();
151         this->total_packages = total_packages;
152         create_packages();
156 void LoadServer::create_clients()
158         if(!clients)
159         {
160                 clients = new LoadClient*[total_clients];
161                 for(int i = 0; i < total_clients; i++)
162                 {
163                         clients[i] = new_client();
164                         clients[i]->server = this;
165                         clients[i]->start();
166                 }
167         }
170 void LoadServer::create_packages()
172         if(!packages)
173         {
174                 packages = new LoadPackage*[total_packages];
175                 for(int i = 0; i < total_packages; i++)
176                         packages[i] = new_package();
177         }
180 LoadPackage* LoadServer::get_package(int number)
182         return packages[number];
185 LoadClient* LoadServer::get_client(int number)
187         return clients[number];
190 int LoadServer::get_total_packages()
192         return total_packages;
195 int LoadServer::get_total_clients()
197         return total_clients;
200 void LoadServer::process_packages()
202         if(!clients) create_clients();
203         if(!packages) create_packages();
204         
205         
206         
207 // Set up packages
208         init_packages();
210         current_package = 0;
211 // Start all clients
212         for(int i = 0; i < total_clients; i++)
213         {
214                 clients[i]->input_lock->unlock();
215         }
216         
217 // Wait for packages to get finished
218         for(int i = 0; i < total_packages; i++)
219         {
220                 packages[i]->completion_lock->lock("LoadServer::process_packages 1");
221         }
223 // Wait for clients to finish before allowing changes to packages
224         for(int i = 0; i < total_clients; i++)
225         {
226                 clients[i]->completion_lock->lock("LoadServer::process_packages 2");
227         }