mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / ndbapi / NdbPoolImpl.cpp
blob77e6c4fa53cedfb0c62e7501ff3e687bd94b65ff
1 /* Copyright (c) 2003-2005 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include "NdbPoolImpl.hpp"
18 NdbMutex *NdbPool::pool_mutex = NULL;
19 NdbPool *the_pool = NULL;
21 NdbPool*
22 NdbPool::create_instance(Ndb_cluster_connection* cc,
23 Uint32 max_ndb_obj,
24 Uint32 no_conn_obj,
25 Uint32 init_no_ndb_objects)
27 if (!initPoolMutex()) {
28 return NULL;
30 NdbMutex_Lock(pool_mutex);
31 NdbPool* a_pool;
32 if (the_pool != NULL) {
33 a_pool = NULL;
34 } else {
35 the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);
36 if (!the_pool->init(init_no_ndb_objects)) {
37 delete the_pool;
38 the_pool = NULL;
40 a_pool = the_pool;
42 NdbMutex* temp = pool_mutex;
43 if (a_pool == NULL) {
44 pool_mutex = NULL;
46 NdbMutex_Unlock(pool_mutex);
47 if (a_pool == NULL) {
48 NdbMutex_Destroy(temp);
50 return a_pool;
53 void
54 NdbPool::drop_instance()
56 if (pool_mutex == NULL) {
57 return;
59 NdbMutex_Lock(pool_mutex);
60 the_pool->release_all();
61 delete the_pool;
62 the_pool = NULL;
63 NdbMutex* temp = pool_mutex;
64 NdbMutex_Unlock(temp);
65 NdbMutex_Destroy(temp);
68 bool
69 NdbPool::initPoolMutex()
71 bool ret_result = false;
72 if (pool_mutex == NULL) {
73 pool_mutex = NdbMutex_Create();
74 ret_result = ((pool_mutex == NULL) ? false : true);
76 return ret_result;
79 NdbPool::NdbPool(Ndb_cluster_connection* cc,
80 Uint32 max_no_objects,
81 Uint32 no_conn_objects)
83 if (no_conn_objects > 1024) {
84 no_conn_objects = 1024;
86 if (max_no_objects > MAX_NDB_OBJECTS) {
87 max_no_objects = MAX_NDB_OBJECTS;
88 } else if (max_no_objects == 0) {
89 max_no_objects = 1;
91 m_max_ndb_objects = max_no_objects;
92 m_no_of_conn_objects = no_conn_objects;
93 m_no_of_objects = 0;
94 m_waiting = 0;
95 m_pool_reference = NULL;
96 m_hash_entry = NULL;
97 m_first_free = NULL_POOL;
98 m_first_not_in_use = NULL_POOL;
99 m_last_free = NULL_POOL;
100 input_pool_cond = NULL;
101 output_pool_cond = NULL;
102 m_output_queue = 0;
103 m_input_queue = 0;
104 m_signal_count = 0;
105 m_cluster_connection = cc;
108 NdbPool::~NdbPool()
110 NdbCondition_Destroy(input_pool_cond);
111 NdbCondition_Destroy(output_pool_cond);
114 void
115 NdbPool::release_all()
117 int i;
118 for (i = 0; i < m_max_ndb_objects + 1; i++) {
119 if (m_pool_reference[i].ndb_reference != NULL) {
120 assert(m_pool_reference[i].in_use);
121 assert(m_pool_reference[i].free_entry);
122 delete m_pool_reference[i].ndb_reference;
125 delete [] m_pool_reference;
126 delete [] m_hash_entry;
127 m_pool_reference = NULL;
128 m_hash_entry = NULL;
131 bool
132 NdbPool::init(Uint32 init_no_objects)
134 bool ret_result = false;
135 int i;
136 do {
137 input_pool_cond = NdbCondition_Create();
138 output_pool_cond = NdbCondition_Create();
139 if (input_pool_cond == NULL || output_pool_cond == NULL) {
140 break;
142 if (init_no_objects > m_max_ndb_objects) {
143 init_no_objects = m_max_ndb_objects;
145 if (init_no_objects == 0) {
146 init_no_objects = 1;
148 m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
149 m_hash_entry = new Uint8[POOL_HASH_TABLE_SIZE];
151 if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
152 delete [] m_pool_reference;
153 delete [] m_hash_entry;
154 break;
156 for (i = 0; i < m_max_ndb_objects + 1; i++) {
157 m_pool_reference[i].ndb_reference = NULL;
158 m_pool_reference[i].in_use = false;
159 m_pool_reference[i].next_free_object = i+1;
160 m_pool_reference[i].prev_free_object = i-1;
161 m_pool_reference[i].next_db_object = NULL_POOL;
162 m_pool_reference[i].prev_db_object = NULL_POOL;
164 for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
165 m_hash_entry[i] = NULL_HASH;
167 m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
168 m_pool_reference[1].prev_free_object = NULL_POOL;
169 m_first_not_in_use = 1;
170 m_no_of_objects = init_no_objects;
171 for (i = init_no_objects; i > 0 ; i--) {
172 Uint32 fake_id;
173 if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
174 release_all();
175 break;
178 ret_result = true;
179 break;
180 } while (1);
181 return ret_result;
185 Get an Ndb object.
186 Input:
187 hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
188 used the last time.
189 a_db_name: NULL = don't check for database specific Ndb object, otherwise
190 a hint of which database is preferred.
191 Output:
192 hint_id: Returns id of Ndb object returned
193 Return value: Ndb object pointer
195 Ndb*
196 NdbPool::get_ndb_object(Uint32 &hint_id,
197 const char* a_catalog_name,
198 const char* a_schema_name)
200 Ndb* ret_ndb = NULL;
201 Uint32 hash_entry = compute_hash(a_schema_name);
202 NdbMutex_Lock(pool_mutex);
203 while (1) {
205 We start by checking if we can use the hinted Ndb object.
207 if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
208 break;
211 The hinted Ndb object was not free. We need to allocate another object.
212 We start by checking for a free Ndb object connected to the same database.
214 if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
215 hash_entry,
216 a_catalog_name,
217 a_schema_name))) {
218 break;
221 No Ndb object connected to the preferred database was found.
222 We look for a free Ndb object in general.
224 if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
225 break;
228 No free Ndb object was found. If we haven't allocated objects up until the
229 maximum number yet then we can allocate a new Ndb object here.
231 if (m_no_of_objects < m_max_ndb_objects) {
232 if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
233 assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
234 break;
238 We need to wait until an Ndb object becomes
239 available.
241 if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
242 break;
245 Not even after waiting were we able to get hold of an Ndb object. We
246 return NULL to indicate this problem.
248 ret_ndb = NULL;
249 break;
251 NdbMutex_Unlock(pool_mutex);
252 if (ret_ndb != NULL) {
254 We need to set the catalog and schema name of the Ndb object before
255 returning it to the caller.
257 ret_ndb->setCatalogName(a_catalog_name);
258 ret_ndb->setSchemaName(a_schema_name);
260 return ret_ndb;
263 void
264 NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
266 NdbMutex_Lock(pool_mutex);
267 assert(id <= m_max_ndb_objects);
268 assert(id != 0);
269 assert(returned_ndb == m_pool_reference[id].ndb_reference);
270 bool wait_cond = m_waiting;
271 if (wait_cond) {
272 NdbCondition* pool_cond;
273 if (m_signal_count > 0) {
274 pool_cond = output_pool_cond;
275 m_signal_count--;
276 } else {
277 pool_cond = input_pool_cond;
279 add_wait_list(id);
280 NdbMutex_Unlock(pool_mutex);
281 NdbCondition_Signal(pool_cond);
282 } else {
283 add_free_list(id);
284 add_db_hash(id);
285 NdbMutex_Unlock(pool_mutex);
289 bool
290 NdbPool::allocate_ndb(Uint32 &id,
291 const char* a_catalog_name,
292 const char* a_schema_name)
294 Ndb* a_ndb;
295 if (m_first_not_in_use == NULL_POOL) {
296 return false;
298 if (a_schema_name) {
299 a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);
300 } else {
301 a_ndb = new Ndb(m_cluster_connection, "");
303 if (a_ndb == NULL) {
304 return false;
306 a_ndb->init(m_no_of_conn_objects);
307 m_no_of_objects++;
309 id = m_first_not_in_use;
310 Uint32 allocated_id = m_first_not_in_use;
311 m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
313 m_pool_reference[allocated_id].ndb_reference = a_ndb;
314 m_pool_reference[allocated_id].in_use = true;
315 m_pool_reference[allocated_id].free_entry = false;
317 add_free_list(allocated_id);
318 add_db_hash(allocated_id);
319 return true;
322 void
323 NdbPool::add_free_list(Uint32 id)
325 assert(!m_pool_reference[id].free_entry);
326 assert(m_pool_reference[id].in_use);
327 m_pool_reference[id].free_entry = true;
328 m_pool_reference[id].next_free_object = m_first_free;
329 m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
330 m_first_free = (Uint8)id;
331 if (m_last_free == (Uint8)NULL_POOL) {
332 m_last_free = (Uint8)id;
336 void
337 NdbPool::add_db_hash(Uint32 id)
339 Ndb* t_ndb = m_pool_reference[id].ndb_reference;
340 const char* schema_name = t_ndb->getSchemaName();
341 Uint32 hash_entry = compute_hash(schema_name);
342 Uint8 next_db_entry = m_hash_entry[hash_entry];
343 m_pool_reference[id].next_db_object = next_db_entry;
344 m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
345 m_hash_entry[hash_entry] = (Uint8)id;
348 Ndb*
349 NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
351 if (m_first_free == NULL_POOL) {
352 return NULL;
354 id = m_first_free;
355 Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
356 assert(ret_ndb != NULL);
357 return ret_ndb;
360 Ndb*
361 NdbPool::get_db_hash(Uint32 &id,
362 Uint32 hash_entry,
363 const char *a_catalog_name,
364 const char *a_schema_name)
366 Uint32 entry_id = m_hash_entry[hash_entry];
367 bool found = false;
368 while (entry_id != NULL_HASH) {
369 Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
370 const char *a_ndb_catalog_name = t_ndb->getCatalogName();
371 if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
372 const char *a_ndb_schema_name = t_ndb->getSchemaName();
373 if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
374 found = true;
375 break;
378 entry_id = m_pool_reference[entry_id].next_db_object;
380 if (found) {
381 id = entry_id;
382 Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
383 assert(ret_ndb != NULL);
384 return ret_ndb;
386 return NULL;
389 Ndb*
390 NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
392 Ndb* ret_ndb = NULL;
393 do {
394 if ((hint_id != 0) &&
395 (hint_id <= m_max_ndb_objects) &&
396 (m_pool_reference[hint_id].in_use) &&
397 (m_pool_reference[hint_id].free_entry)) {
398 ret_ndb = m_pool_reference[hint_id].ndb_reference;
399 if (ret_ndb != NULL) {
400 break;
401 } else {
402 assert(false);
405 return NULL;
406 } while (1);
408 This is where we remove the entry from the free list and from the db hash
409 table.
411 remove_free_list(hint_id);
412 remove_db_hash(hint_id, hash_entry);
413 return ret_ndb;
416 void
417 NdbPool::remove_free_list(Uint32 id)
419 Uint8 next_free_entry = m_pool_reference[id].next_free_object;
420 Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;
421 if (prev_free_entry == (Uint8)NULL_POOL) {
422 m_first_free = next_free_entry;
423 } else {
424 m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
426 if (next_free_entry == (Uint8)NULL_POOL) {
427 m_last_free = prev_free_entry;
428 } else {
429 m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
431 m_pool_reference[id].next_free_object = NULL_POOL;
432 m_pool_reference[id].prev_free_object = NULL_POOL;
433 m_pool_reference[id].free_entry = false;
436 void
437 NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
439 Uint8 next_free_entry = m_pool_reference[id].next_db_object;
440 Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;
441 if (prev_free_entry == (Uint8)NULL_HASH) {
442 m_hash_entry[hash_entry] = next_free_entry;
443 } else {
444 m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
446 if (next_free_entry == (Uint8)NULL_HASH) {
448 } else {
449 m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
451 m_pool_reference[id].next_db_object = NULL_HASH;
452 m_pool_reference[id].prev_db_object = NULL_HASH;
455 Uint32
456 NdbPool::compute_hash(const char *a_schema_name)
458 Uint32 len = strlen(a_schema_name);
459 Uint32 h = 147;
460 for (Uint32 i = 0; i < len; i++) {
461 Uint32 c = a_schema_name[i];
462 h = (h << 5) + h + c;
464 h &= (POOL_HASH_TABLE_SIZE - 1);
465 return h;
468 Ndb*
469 NdbPool::wait_free_ndb(Uint32 &id)
471 int res;
472 int time_out = 3500;
473 do {
474 NdbCondition* tmp = input_pool_cond;
475 m_waiting++;
476 m_input_queue++;
477 time_out -= 500;
478 res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
479 if (tmp == input_pool_cond) {
480 m_input_queue--;
481 } else {
482 m_output_queue--;
483 if (m_output_queue == 0) {
484 switch_condition_queue();
487 m_waiting--;
488 } while (res == 0 && m_first_wait == NULL_POOL);
489 if (res != 0 && m_first_wait == NULL_POOL) {
490 return NULL;
492 id = m_first_wait;
493 remove_wait_list();
494 assert(m_waiting != 0 || m_first_wait == NULL_POOL);
495 return m_pool_reference[id].ndb_reference;
498 void
499 NdbPool::remove_wait_list()
501 Uint32 id = m_first_wait;
502 m_first_wait = m_pool_reference[id].next_free_object;
503 m_pool_reference[id].next_free_object = NULL_POOL;
504 m_pool_reference[id].prev_free_object = NULL_POOL;
505 m_pool_reference[id].free_entry = false;
508 void
509 NdbPool::add_wait_list(Uint32 id)
511 m_pool_reference[id].next_free_object = m_first_wait;
512 m_first_wait = id;
515 void
516 NdbPool::switch_condition_queue()
518 m_signal_count = m_input_queue;
519 Uint8 move_queue = m_input_queue;
520 m_input_queue = m_output_queue;
521 m_output_queue = move_queue;
523 NdbCondition* move_cond = input_pool_cond;
524 input_pool_cond = output_pool_cond;
525 output_pool_cond = move_cond;