r999: maintainers added to README_en.
[cinelerra_cv/mob.git] / cinelerra / loadbalance.C
blob53e674f4c5c3d61eb2c0bbcd2555f32d72de3880
1 #include "condition.h"
2 #include "mutex.h"
3 #include "loadbalance.h"
8 LoadPackage::LoadPackage()
10         completion_lock = new Condition(0, "LoadPackage::completion_lock");
12 LoadPackage::~LoadPackage()
14         delete completion_lock;
25 LoadClient::LoadClient(LoadServer *server)
26  : Thread()
28         Thread::set_synchronous(1);
29         this->server = server;
30         done = 0;
31         package_number = 0;
32         input_lock = new Condition(0, "LoadClient::input_lock");
33         completion_lock = new Condition(0, "LoadClient::completion_lock");
36 LoadClient::LoadClient()
37  : Thread()
39         Thread::set_synchronous(1);
40         server = 0;
41         done = 0;
42         package_number = 0;
43         input_lock = new Condition(0, "LoadClient::input_lock");
44         completion_lock = new Condition(0, "LoadClient::completion_lock");
47 LoadClient::~LoadClient()
49         done = 1;
50         input_lock->unlock();
51         Thread::join();
52         delete input_lock;
53         delete completion_lock;
56 int LoadClient::get_package_number()
58         return package_number;
61 LoadServer* LoadClient::get_server()
63         return server;
67 void LoadClient::run()
69         while(!done)
70         {
71                 input_lock->lock("LoadClient::run");
73                 if(!done)
74                 {
75 // Read packet
76                         LoadPackage *package;
77                         
78                         
79                         server->client_lock->lock("LoadClient::run");
80                         if(server->current_package < server->total_packages)
81                         {
82                                 package_number = server->current_package;
83                                 package = server->packages[server->current_package++];
84                                 server->client_lock->unlock();
85                                 input_lock->unlock();
87                                 process_package(package);
89                                 package->completion_lock->unlock();
90                         }
91                         else
92                         {
93                                 completion_lock->unlock();
94                                 server->client_lock->unlock();
95                         }
96                 }
97         }
100 void LoadClient::run_single()
102         if(server->total_packages)
103                 process_package(server->packages[0]);
106 void LoadClient::process_package(LoadPackage *package)
108         printf("LoadClient::process_package\n");
115 LoadServer::LoadServer(int total_clients, int total_packages)
117         if(total_clients <= 0)
118                 printf("LoadServer::LoadServer total_clients == %d\n", total_clients);
119         this->total_clients = total_clients;
120         this->total_packages = total_packages;
121         current_package = 0;
122         clients = 0;
123         packages = 0;
124         client_lock = new Mutex("LoadServer::client_lock");
125         is_single = 0;
126         single_client = 0;
129 LoadServer::~LoadServer()
131         delete_clients();
132         delete_packages();
133         delete client_lock;
136 void LoadServer::delete_clients()
138         if(clients)
139         {
140                 for(int i = 0; i < total_clients; i++)
141                         delete clients[i];
142                 delete [] clients;
143         }
144         if(single_client) delete single_client;
145         clients = 0;
146         single_client = 0;
149 void LoadServer::delete_packages()
151         if(packages)
152         {
153                 for(int i = 0; i < total_packages; i++)
154                         delete packages[i];
155                 delete [] packages;
156         }
157         packages = 0;
160 void LoadServer::set_package_count(int total_packages)
162         delete_packages();
163         this->total_packages = total_packages;
164         create_packages();
168 void LoadServer::create_clients()
170         if(!is_single && !clients)
171         {
172                 clients = new LoadClient*[total_clients];
173                 for(int i = 0; i < total_clients; i++)
174                 {
175                         clients[i] = new_client();
176                         clients[i]->server = this;
177                         clients[i]->start();
178                 }
179         }
181         if(is_single && !single_client)
182         {
183                 single_client = new_client();
184                 single_client->server = this;
185         }
188 void LoadServer::create_packages()
190         if(!packages)
191         {
192                 packages = new LoadPackage*[total_packages];
193                 for(int i = 0; i < total_packages; i++)
194                         packages[i] = new_package();
195         }
198 LoadPackage* LoadServer::get_package(int number)
200         return packages[number];
203 LoadClient* LoadServer::get_client(int number)
205         return clients[number];
208 int LoadServer::get_total_packages()
210         if(is_single) return 1;
211         return total_packages;
214 int LoadServer::get_total_clients()
216         if(is_single) return 1;
217         return total_clients;
220 void LoadServer::process_packages()
222         is_single = 0;
223         create_clients();
224         create_packages();
225         
226         
227         
228 // Set up packages
229         init_packages();
231         current_package = 0;
232 // Start all clients
233         for(int i = 0; i < total_clients; i++)
234         {
235                 clients[i]->input_lock->unlock();
236         }
237         
238 // Wait for packages to get finished
239         for(int i = 0; i < total_packages; i++)
240         {
241                 packages[i]->completion_lock->lock("LoadServer::process_packages 1");
242         }
244 // Wait for clients to finish before allowing changes to packages
245         for(int i = 0; i < total_clients; i++)
246         {
247                 clients[i]->completion_lock->lock("LoadServer::process_packages 2");
248         }
251 void LoadServer::process_single()
253         is_single = 1;
254         create_clients();
255         create_packages();
256         init_packages();
257         current_package = 0;
258         single_client->run_single();