mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / sql / ha_ndbcluster_binlog.cc
blob66530a64bd71c12845497fd7bc3c856b3184d3b6
1 /* Copyright (c) 2006, 2012, Oracle and/or its affiliates. All rights reserved.
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 #include "mysql_priv.h"
17 #include "sql_show.h"
18 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
19 #include "ha_ndbcluster.h"
21 #ifdef HAVE_NDB_BINLOG
22 #include "rpl_injector.h"
23 #include "rpl_filter.h"
24 #include "slave.h"
25 #include "log_event.h"
26 #include "ha_ndbcluster_binlog.h"
27 #include "NdbDictionary.hpp"
28 #include "ndb_cluster_connection.hpp"
29 #include <util/NdbAutoPtr.hpp>
31 #ifdef ndb_dynamite
32 #undef assert
33 #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
34 #endif
37 defines for cluster replication table names
39 #include "ha_ndbcluster_tables.h"
40 #define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
41 #define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
44 Timeout for syncing schema events between
45 mysql servers, and between mysql server and the binlog
47 const int opt_ndb_sync_timeout= 120;
50 Flag showing if the ndb injector thread is running, if so == 1
51 -1 if it was started but later stopped for some reason
52 0 if never started
54 int ndb_binlog_thread_running= 0;
56 Flag showing if the ndb binlog should be created, if so == TRUE
57 FALSE if not
59 my_bool ndb_binlog_running= FALSE;
60 my_bool ndb_binlog_tables_inited= FALSE;
63 Global reference to the ndb injector thread THD oject
65 Has one sole purpose, for setting the in_use table member variable
66 in get_share(...)
68 THD *injector_thd= 0;
71 Global reference to ndb injector thd object.
73 Used mainly by the binlog index thread, but exposed to the client sql
74 thread for one reason; to setup the events operations for a table
75 to enable ndb injector thread receiving events.
77 Must therefore always be used with a surrounding
78 pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
80 static Ndb *injector_ndb= 0;
81 static Ndb *schema_ndb= 0;
83 static int ndbcluster_binlog_inited= 0;
85 Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
86 Server main loop should call handlerton function:
88 ndbcluster_hton->binlog_func ==
89 ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
90 ndbcluster_binlog_end
92 at shutdown, which sets the flag. And then server needs to wait for it
93 to complete. Otherwise binlog will not be complete.
95 ndbcluster_hton->panic == ndbcluster_end() will not return until
96 ndb binlog is completed
98 static int ndbcluster_binlog_terminating= 0;
101 Mutex and condition used for interacting between client sql thread
102 and injector thread
104 pthread_t ndb_binlog_thread;
105 pthread_mutex_t injector_mutex;
106 pthread_cond_t injector_cond;
108 /* NDB Injector thread (used for binlog creation) */
109 static ulonglong ndb_latest_applied_binlog_epoch= 0;
110 static ulonglong ndb_latest_handled_binlog_epoch= 0;
111 static ulonglong ndb_latest_received_binlog_epoch= 0;
113 NDB_SHARE *ndb_apply_status_share= 0;
114 NDB_SHARE *ndb_schema_share= 0;
115 pthread_mutex_t ndb_schema_share_mutex;
117 extern my_bool opt_log_slave_updates;
118 static my_bool g_ndb_log_slave_updates;
120 /* Schema object distribution handling */
121 HASH ndb_schema_objects;
122 typedef struct st_ndb_schema_object {
123 pthread_mutex_t mutex;
124 char *key;
125 uint key_length;
126 uint use_count;
127 MY_BITMAP slock_bitmap;
128 uint32 slock[256/32]; // 256 bits for lock status of table
129 } NDB_SCHEMA_OBJECT;
130 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
131 my_bool create_if_not_exists,
132 my_bool have_lock);
133 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
134 bool have_lock);
136 static Uint64 *p_latest_trans_gci= 0;
139 Global variables for holding the ndb_binlog_index table reference
141 static TABLE *ndb_binlog_index= 0;
142 static TABLE_LIST binlog_tables;
145 Helper functions
148 #ifndef DBUG_OFF
149 /* purecov: begin deadcode */
150 static void print_records(TABLE *table, const uchar *record)
152 for (uint j= 0; j < table->s->fields; j++)
154 char buf[40];
155 int pos= 0;
156 Field *field= table->field[j];
157 const uchar* field_ptr= field->ptr - table->record[0] + record;
158 int pack_len= field->pack_length();
159 int n= pack_len < 10 ? pack_len : 10;
161 for (int i= 0; i < n && pos < 20; i++)
163 pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
165 buf[pos]= 0;
166 DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
169 /* purecov: end */
170 #else
171 #define print_records(a,b)
172 #endif
175 #ifndef DBUG_OFF
176 static void dbug_print_table(const char *info, TABLE *table)
178 if (table == 0)
180 DBUG_PRINT("info",("%s: (null)", info));
181 return;
183 DBUG_PRINT("info",
184 ("%s: %s.%s s->fields: %d "
185 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
186 "record[1]: 0x%lx",
187 info,
188 table->s->db.str,
189 table->s->table_name.str,
190 table->s->fields,
191 table->s->reclength,
192 table->s->rec_buff_length,
193 (long) table->record[0],
194 (long) table->record[1]));
196 for (unsigned int i= 0; i < table->s->fields; i++)
198 Field *f= table->field[i];
199 DBUG_PRINT("info",
200 ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
201 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
203 f->field_name,
204 (long) f->flags,
205 (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
206 (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
207 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
208 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
209 (f->flags & BLOB_FLAG) ? ",blob" : "",
210 (f->flags & BINARY_FLAG) ? ",binary" : "",
211 f->real_type(),
212 f->pack_length(),
213 (long) f->ptr, (int) (f->ptr - table->record[0]),
214 f->null_bit,
215 (long) f->null_ptr,
216 (int) ((uchar*) f->null_ptr - table->record[0])));
217 if (f->type() == MYSQL_TYPE_BIT)
219 Field_bit *g= (Field_bit*) f;
220 DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
221 "bit_ofs: %d bit_len: %u",
222 g->field_length, (long) g->bit_ptr,
223 (int) ((uchar*) g->bit_ptr -
224 table->record[0]),
225 g->bit_ofs, g->bit_len));
229 #else
230 #define dbug_print_table(a,b)
231 #endif
235 Run a query through mysql_parse
237 Used to:
238 - purging the ndb_binlog_index
239 - creating the ndb_apply_status table
241 static void run_query(THD *thd, char *buf, char *end,
242 const int *no_print_error, my_bool disable_binlog)
244 ulong save_thd_query_length= thd->query_length();
245 char *save_thd_query= thd->query();
246 ulong save_thread_id= thd->variables.pseudo_thread_id;
247 struct system_status_var save_thd_status_var= thd->status_var;
248 THD_TRANS save_thd_transaction_all= thd->transaction.all;
249 THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
250 ulonglong save_thd_options= thd->options;
251 DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
252 NET save_thd_net= thd->net;
253 const char* found_semicolon= NULL;
255 bzero((char*) &thd->net, sizeof(NET));
256 thd->set_query(buf, (uint) (end - buf));
257 thd->variables.pseudo_thread_id= thread_id;
258 thd->transaction.stmt.modified_non_trans_table= FALSE;
259 if (disable_binlog)
260 thd->options&= ~OPTION_BIN_LOG;
262 DBUG_PRINT("query", ("%s", thd->query()));
264 DBUG_ASSERT(!thd->in_sub_stmt);
265 DBUG_ASSERT(!thd->prelocked_mode);
267 mysql_parse(thd, thd->query(), thd->query_length(), &found_semicolon);
269 if (no_print_error && thd->is_slave_error)
271 int i;
272 Thd_ndb *thd_ndb= get_thd_ndb(thd);
273 for (i= 0; no_print_error[i]; i++)
274 if ((thd_ndb->m_error_code == no_print_error[i]) ||
275 (thd->main_da.sql_errno() == (unsigned) no_print_error[i]))
276 break;
277 if (!no_print_error[i])
278 sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
279 buf,
280 thd->main_da.message(),
281 thd->main_da.sql_errno(),
282 thd_ndb->m_error_code,
283 (int) thd->is_error(), thd->is_slave_error);
285 close_thread_tables(thd);
287 XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
288 can not be called from within a statement, and
289 run_query() can be called from anywhere, including from within
290 a sub-statement.
291 This particular reset is a temporary hack to avoid an assert
292 for double assignment of the diagnostics area when run_query()
293 is called from ndbcluster_reset_logs(), which is called from
294 mysql_flush().
296 thd->main_da.reset_diagnostics_area();
298 thd->options= save_thd_options;
299 thd->set_query(save_thd_query, save_thd_query_length);
300 thd->variables.pseudo_thread_id= save_thread_id;
301 thd->status_var= save_thd_status_var;
302 thd->transaction.all= save_thd_transaction_all;
303 thd->transaction.stmt= save_thd_transaction_stmt;
304 thd->net= save_thd_net;
306 if (thd == injector_thd)
309 running the query will close all tables, including the ndb_binlog_index
310 used in injector_thd
312 ndb_binlog_index= 0;
316 static void
317 ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
319 DBUG_ENTER("ndbcluster_binlog_close_table");
320 if (share->table_share)
322 closefrm(share->table, 1);
323 share->table_share= 0;
324 share->table= 0;
326 DBUG_ASSERT(share->table == 0);
327 DBUG_VOID_RETURN;
332 Creates a TABLE object for the ndb cluster table
334 NOTES
335 This does not open the underlying table
338 static int
339 ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
340 TABLE_SHARE *table_share, TABLE *table,
341 int reopen)
343 int error;
344 DBUG_ENTER("ndbcluster_binlog_open_table");
346 safe_mutex_assert_owner(&LOCK_open);
347 init_tmp_table_share(thd, table_share, share->db, 0, share->table_name,
348 share->key);
349 if ((error= open_table_def(thd, table_share, 0)))
351 DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
352 free_table_share(table_share);
353 DBUG_RETURN(error);
355 if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
356 (uint) READ_ALL, 0, table, FALSE)))
358 DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
359 free_table_share(table_share);
360 DBUG_RETURN(error);
362 assign_new_table_id(table_share);
364 if (!reopen)
366 // allocate memory on ndb share so it can be reused after online alter table
367 (void)multi_alloc_root(&share->mem_root,
368 &(share->record[0]), table->s->rec_buff_length,
369 &(share->record[1]), table->s->rec_buff_length,
370 NULL);
373 my_ptrdiff_t row_offset= share->record[0] - table->record[0];
374 Field **p_field;
375 for (p_field= table->field; *p_field; p_field++)
376 (*p_field)->move_field_offset(row_offset);
377 table->record[0]= share->record[0];
378 table->record[1]= share->record[1];
381 table->in_use= injector_thd;
383 table->s->db.str= share->db;
384 table->s->db.length= strlen(share->db);
385 table->s->table_name.str= share->table_name;
386 table->s->table_name.length= strlen(share->table_name);
388 DBUG_ASSERT(share->table_share == 0);
389 share->table_share= table_share;
390 DBUG_ASSERT(share->table == 0);
391 share->table= table;
392 /* We can't use 'use_all_columns()' as the file object is not setup yet */
393 table->column_bitmaps_set_no_signal(&table->s->all_set, &table->s->all_set);
394 #ifndef DBUG_OFF
395 dbug_print_table("table", table);
396 #endif
397 DBUG_RETURN(0);
402 Initialize the binlog part of the NDB_SHARE
404 int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
406 THD *thd= current_thd;
407 MEM_ROOT *mem_root= &share->mem_root;
408 int do_event_op= ndb_binlog_running;
409 int error= 0;
410 DBUG_ENTER("ndbcluster_binlog_init_share");
412 share->connect_count= g_ndb_cluster_connection->get_connect_count();
414 share->op= 0;
415 share->table= 0;
417 if (!ndb_schema_share &&
418 strcmp(share->db, NDB_REP_DB) == 0 &&
419 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
420 do_event_op= 1;
421 else if (!ndb_apply_status_share &&
422 strcmp(share->db, NDB_REP_DB) == 0 &&
423 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
424 do_event_op= 1;
427 int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
428 share->subscriber_bitmap= (MY_BITMAP*)
429 alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
430 for (i= 0; i < no_nodes; i++)
432 bitmap_init(&share->subscriber_bitmap[i],
433 (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
434 max_ndb_nodes, FALSE);
435 bitmap_clear_all(&share->subscriber_bitmap[i]);
439 if (!do_event_op)
441 if (_table)
443 if (_table->s->primary_key == MAX_KEY)
444 share->flags|= NSF_HIDDEN_PK;
445 if (_table->s->blob_fields != 0)
446 share->flags|= NSF_BLOB_FLAG;
448 else
450 share->flags|= NSF_NO_BINLOG;
452 DBUG_RETURN(error);
454 while (1)
456 int error;
457 TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
458 TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
459 if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
460 break;
462 ! do not touch the contents of the table
463 it may be in use by the injector thread
465 MEM_ROOT *mem_root= &share->mem_root;
466 share->ndb_value[0]= (NdbValue*)
467 alloc_root(mem_root, sizeof(NdbValue) *
468 (table->s->fields + 2 /*extra for hidden key and part key*/));
469 share->ndb_value[1]= (NdbValue*)
470 alloc_root(mem_root, sizeof(NdbValue) *
471 (table->s->fields + 2 /*extra for hidden key and part key*/));
473 if (table->s->primary_key == MAX_KEY)
474 share->flags|= NSF_HIDDEN_PK;
475 if (table->s->blob_fields != 0)
476 share->flags|= NSF_BLOB_FLAG;
477 break;
479 DBUG_RETURN(error);
482 /*****************************************************************
483 functions called from master sql client threads
484 ****************************************************************/
487 called in mysql_show_binlog_events and reset_logs to make sure we wait for
488 all events originating from this mysql server to arrive in the binlog
490 Wait for the last epoch in which the last transaction is a part of.
492 Wait a maximum of 30 seconds.
494 static void ndbcluster_binlog_wait(THD *thd)
496 if (ndb_binlog_running)
498 DBUG_ENTER("ndbcluster_binlog_wait");
499 const char *save_info= thd ? thd->proc_info : 0;
500 ulonglong wait_epoch= *p_latest_trans_gci;
501 int count= 30;
502 if (thd)
503 thd->proc_info= "Waiting for ndbcluster binlog update to "
504 "reach current position";
505 while (count && ndb_binlog_running &&
506 ndb_latest_handled_binlog_epoch < wait_epoch)
508 count--;
509 sleep(1);
511 if (thd)
512 thd->proc_info= save_info;
513 DBUG_VOID_RETURN;
518 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
520 static int ndbcluster_reset_logs(THD *thd)
522 if (!ndb_binlog_running)
523 return 0;
525 DBUG_ENTER("ndbcluster_reset_logs");
528 Wait for all events orifinating from this mysql server has
529 reached the binlog before continuing to reset
531 ndbcluster_binlog_wait(thd);
533 char buf[1024];
534 char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
536 run_query(thd, buf, end, NULL, TRUE);
538 DBUG_RETURN(0);
542 Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
543 is removed
546 static int
547 ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
549 if (!ndb_binlog_running || thd->slave_thread)
550 return 0;
552 DBUG_ENTER("ndbcluster_binlog_index_purge_file");
553 DBUG_PRINT("enter", ("file: %s", file));
555 char buf[1024];
556 char *end= strmov(strmov(strmov(buf,
557 "DELETE FROM "
558 NDB_REP_DB "." NDB_REP_TABLE
559 " WHERE File='"), file), "'");
561 run_query(thd, buf, end, NULL, TRUE);
563 DBUG_RETURN(0);
566 static void
567 ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
568 const char *query, uint query_length,
569 const char *db, const char *table_name)
571 DBUG_ENTER("ndbcluster_binlog_log_query");
572 DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
573 db, table_name, query));
574 enum SCHEMA_OP_TYPE type;
575 int log= 0;
576 switch (binlog_command)
578 case LOGCOM_CREATE_TABLE:
579 type= SOT_CREATE_TABLE;
580 DBUG_ASSERT(FALSE);
581 break;
582 case LOGCOM_ALTER_TABLE:
583 type= SOT_ALTER_TABLE;
584 log= 1;
585 break;
586 case LOGCOM_RENAME_TABLE:
587 type= SOT_RENAME_TABLE;
588 DBUG_ASSERT(FALSE);
589 break;
590 case LOGCOM_DROP_TABLE:
591 type= SOT_DROP_TABLE;
592 DBUG_ASSERT(FALSE);
593 break;
594 case LOGCOM_CREATE_DB:
595 type= SOT_CREATE_DB;
596 log= 1;
597 break;
598 case LOGCOM_ALTER_DB:
599 type= SOT_ALTER_DB;
600 log= 1;
601 break;
602 case LOGCOM_DROP_DB:
603 type= SOT_DROP_DB;
604 DBUG_ASSERT(FALSE);
605 break;
607 if (log)
609 ndbcluster_log_schema_op(thd, 0, query, query_length,
610 db, table_name, 0, 0, type,
611 0, 0, 0);
613 DBUG_VOID_RETURN;
618 End use of the NDB Cluster binlog
619 - wait for binlog thread to shutdown
622 static int ndbcluster_binlog_end(THD *thd)
624 DBUG_ENTER("ndbcluster_binlog_end");
626 if (!ndbcluster_binlog_inited)
627 DBUG_RETURN(0);
628 ndbcluster_binlog_inited= 0;
630 #ifdef HAVE_NDB_BINLOG
631 if (ndb_util_thread_running > 0)
634 Wait for util thread to die (as this uses the injector mutex)
635 There is a very small change that ndb_util_thread dies and the
636 following mutex is freed before it's accessed. This shouldn't
637 however be a likely case as the ndbcluster_binlog_end is supposed to
638 be called before ndb_cluster_end().
640 pthread_mutex_lock(&LOCK_ndb_util_thread);
641 /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
642 ndb_util_thread_running++;
643 ndbcluster_terminating= 1;
644 pthread_cond_signal(&COND_ndb_util_thread);
645 while (ndb_util_thread_running > 1)
646 pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
647 ndb_util_thread_running--;
648 pthread_mutex_unlock(&LOCK_ndb_util_thread);
651 /* wait for injector thread to finish */
652 ndbcluster_binlog_terminating= 1;
653 pthread_mutex_lock(&injector_mutex);
654 pthread_cond_signal(&injector_cond);
655 while (ndb_binlog_thread_running > 0)
656 pthread_cond_wait(&injector_cond, &injector_mutex);
657 pthread_mutex_unlock(&injector_mutex);
659 pthread_mutex_destroy(&injector_mutex);
660 pthread_cond_destroy(&injector_cond);
661 pthread_mutex_destroy(&ndb_schema_share_mutex);
662 #endif
664 DBUG_RETURN(0);
667 /*****************************************************************
668 functions called from slave sql client threads
669 ****************************************************************/
670 static void ndbcluster_reset_slave(THD *thd)
672 if (!ndb_binlog_running)
673 return;
675 DBUG_ENTER("ndbcluster_reset_slave");
676 char buf[1024];
677 char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
678 run_query(thd, buf, end, NULL, TRUE);
679 DBUG_VOID_RETURN;
683 Initialize the binlog part of the ndb handlerton
687 Upon the sql command flush logs, we need to ensure that all outstanding
688 ndb data to be logged has made it to the binary log to get a deterministic
689 behavior on the rotation of the log.
691 static bool ndbcluster_flush_logs(handlerton *hton)
693 ndbcluster_binlog_wait(current_thd);
694 return FALSE;
697 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
698 enum_binlog_func fn,
699 void *arg)
701 switch(fn)
703 case BFN_RESET_LOGS:
704 ndbcluster_reset_logs(thd);
705 break;
706 case BFN_RESET_SLAVE:
707 ndbcluster_reset_slave(thd);
708 break;
709 case BFN_BINLOG_WAIT:
710 ndbcluster_binlog_wait(thd);
711 break;
712 case BFN_BINLOG_END:
713 ndbcluster_binlog_end(thd);
714 break;
715 case BFN_BINLOG_PURGE_FILE:
716 ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
717 break;
719 return 0;
722 void ndbcluster_binlog_init_handlerton()
724 handlerton *h= ndbcluster_hton;
725 h->flush_logs= ndbcluster_flush_logs;
726 h->binlog_func= ndbcluster_binlog_func;
727 h->binlog_log_query= ndbcluster_binlog_log_query;
735 check the availability af the ndb_apply_status share
736 - return share, but do not increase refcount
737 - return 0 if there is no share
739 static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
741 pthread_mutex_lock(&ndbcluster_mutex);
743 void *share= hash_search(&ndbcluster_open_tables,
744 (uchar*) NDB_APPLY_TABLE_FILE,
745 sizeof(NDB_APPLY_TABLE_FILE) - 1);
746 DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
747 NDB_APPLY_TABLE_FILE, (long) share));
748 pthread_mutex_unlock(&ndbcluster_mutex);
749 return (NDB_SHARE*) share;
753 check the availability af the schema share
754 - return share, but do not increase refcount
755 - return 0 if there is no share
757 static NDB_SHARE *ndbcluster_check_ndb_schema_share()
759 pthread_mutex_lock(&ndbcluster_mutex);
761 void *share= hash_search(&ndbcluster_open_tables,
762 (uchar*) NDB_SCHEMA_TABLE_FILE,
763 sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
764 DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
765 NDB_SCHEMA_TABLE_FILE, (long) share));
766 pthread_mutex_unlock(&ndbcluster_mutex);
767 return (NDB_SHARE*) share;
771 Create the ndb_apply_status table
773 static int ndbcluster_create_ndb_apply_status_table(THD *thd)
775 DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
778 Check if we already have the apply status table.
779 If so it should have been discovered at startup
780 and thus have a share
783 if (ndbcluster_check_ndb_apply_status_share())
784 DBUG_RETURN(0);
786 if (g_ndb_cluster_connection->get_no_ready() <= 0)
787 DBUG_RETURN(0);
789 char buf[1024 + 1], *end;
791 if (ndb_extra_logging)
792 sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
795 Check if apply status table exists in MySQL "dictionary"
796 if so, remove it since there is none in Ndb
799 build_table_filename(buf, sizeof(buf) - 1,
800 NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
801 my_delete(buf, MYF(0));
805 Note, updating this table schema must be reflected in ndb_restore
807 end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
808 NDB_REP_DB "." NDB_APPLY_TABLE
809 " ( server_id INT UNSIGNED NOT NULL,"
810 " epoch BIGINT UNSIGNED NOT NULL, "
811 " log_name VARCHAR(255) BINARY NOT NULL, "
812 " start_pos BIGINT UNSIGNED NOT NULL, "
813 " end_pos BIGINT UNSIGNED NOT NULL, "
814 " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
816 const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
817 701,
818 702,
819 721, // Table already exist
820 4009,
821 0}; // do not print error 701 etc
822 run_query(thd, buf, end, no_print_error, TRUE);
824 DBUG_RETURN(0);
829 Create the schema table
831 static int ndbcluster_create_schema_table(THD *thd)
833 DBUG_ENTER("ndbcluster_create_schema_table");
836 Check if we already have the schema table.
837 If so it should have been discovered at startup
838 and thus have a share
841 if (ndbcluster_check_ndb_schema_share())
842 DBUG_RETURN(0);
844 if (g_ndb_cluster_connection->get_no_ready() <= 0)
845 DBUG_RETURN(0);
847 char buf[1024 + 1], *end;
849 if (ndb_extra_logging)
850 sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
853 Check if schema table exists in MySQL "dictionary"
854 if so, remove it since there is none in Ndb
857 build_table_filename(buf, sizeof(buf) - 1,
858 NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
859 my_delete(buf, MYF(0));
863 Update the defines below to reflect the table schema
865 end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
866 NDB_REP_DB "." NDB_SCHEMA_TABLE
867 " ( db VARBINARY(63) NOT NULL,"
868 " name VARBINARY(63) NOT NULL,"
869 " slock BINARY(32) NOT NULL,"
870 " query BLOB NOT NULL,"
871 " node_id INT UNSIGNED NOT NULL,"
872 " epoch BIGINT UNSIGNED NOT NULL,"
873 " id INT UNSIGNED NOT NULL,"
874 " version INT UNSIGNED NOT NULL,"
875 " type INT UNSIGNED NOT NULL,"
876 " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
878 const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
879 701,
880 702,
881 721, // Table already exist
882 4009,
883 0}; // do not print error 701 etc
884 run_query(thd, buf, end, no_print_error, TRUE);
886 DBUG_RETURN(0);
889 int ndbcluster_setup_binlog_table_shares(THD *thd)
891 if (!ndb_schema_share &&
892 ndbcluster_check_ndb_schema_share() == 0)
894 pthread_mutex_lock(&LOCK_open);
895 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
896 pthread_mutex_unlock(&LOCK_open);
897 if (!ndb_schema_share)
899 ndbcluster_create_schema_table(thd);
900 // always make sure we create the 'schema' first
901 if (!ndb_schema_share)
902 return 1;
905 if (!ndb_apply_status_share &&
906 ndbcluster_check_ndb_apply_status_share() == 0)
908 pthread_mutex_lock(&LOCK_open);
909 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
910 pthread_mutex_unlock(&LOCK_open);
911 if (!ndb_apply_status_share)
913 ndbcluster_create_ndb_apply_status_table(thd);
914 if (!ndb_apply_status_share)
915 return 1;
918 if (!ndbcluster_find_all_files(thd))
920 pthread_mutex_lock(&LOCK_open);
921 ndb_binlog_tables_inited= TRUE;
922 if (ndb_extra_logging)
923 sql_print_information("NDB Binlog: ndb tables writable");
924 close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
925 pthread_mutex_unlock(&LOCK_open);
926 /* Signal injector thread that all is setup */
927 pthread_cond_signal(&injector_cond);
929 return 0;
933 Defines and struct for schema table.
934 Should reflect table definition above.
936 #define SCHEMA_DB_I 0u
937 #define SCHEMA_NAME_I 1u
938 #define SCHEMA_SLOCK_I 2u
939 #define SCHEMA_QUERY_I 3u
940 #define SCHEMA_NODE_ID_I 4u
941 #define SCHEMA_EPOCH_I 5u
942 #define SCHEMA_ID_I 6u
943 #define SCHEMA_VERSION_I 7u
944 #define SCHEMA_TYPE_I 8u
945 #define SCHEMA_SIZE 9u
946 #define SCHEMA_SLOCK_SIZE 32u
948 struct Cluster_schema
950 uchar db_length;
951 char db[64];
952 uchar name_length;
953 char name[64];
954 uchar slock_length;
955 uint32 slock[SCHEMA_SLOCK_SIZE/4];
956 unsigned short query_length;
957 char *query;
958 Uint64 epoch;
959 uint32 node_id;
960 uint32 id;
961 uint32 version;
962 uint32 type;
963 uint32 any_value;
967 Transfer schema table data into corresponding struct
969 static void ndbcluster_get_schema(NDB_SHARE *share,
970 Cluster_schema *s)
972 TABLE *table= share->table;
973 Field **field;
974 /* unpack blob values */
975 uchar* blobs_buffer= 0;
976 uint blobs_buffer_size= 0;
977 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
979 ptrdiff_t ptrdiff= 0;
980 int ret= get_ndb_blobs_value(table, share->ndb_value[0],
981 blobs_buffer, blobs_buffer_size,
982 ptrdiff);
983 if (ret != 0)
985 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
986 DBUG_PRINT("info", ("blob read error"));
987 DBUG_ASSERT(FALSE);
990 /* db varchar 1 length uchar */
991 field= table->field;
992 s->db_length= *(uint8*)(*field)->ptr;
993 DBUG_ASSERT(s->db_length <= (*field)->field_length);
994 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
995 memcpy(s->db, (*field)->ptr + 1, s->db_length);
996 s->db[s->db_length]= 0;
997 /* name varchar 1 length uchar */
998 field++;
999 s->name_length= *(uint8*)(*field)->ptr;
1000 DBUG_ASSERT(s->name_length <= (*field)->field_length);
1001 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
1002 memcpy(s->name, (*field)->ptr + 1, s->name_length);
1003 s->name[s->name_length]= 0;
1004 /* slock fixed length */
1005 field++;
1006 s->slock_length= (*field)->field_length;
1007 DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
1008 memcpy(s->slock, (*field)->ptr, s->slock_length);
1009 /* query blob */
1010 field++;
1012 Field_blob *field_blob= (Field_blob*)(*field);
1013 uint blob_len= field_blob->get_length((*field)->ptr);
1014 uchar *blob_ptr= 0;
1015 field_blob->get_ptr(&blob_ptr);
1016 DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1017 s->query_length= blob_len;
1018 s->query= sql_strmake((char*) blob_ptr, blob_len);
1020 /* node_id */
1021 field++;
1022 s->node_id= ((Field_long *)*field)->val_int();
1023 /* epoch */
1024 field++;
1025 s->epoch= ((Field_long *)*field)->val_int();
1026 /* id */
1027 field++;
1028 s->id= ((Field_long *)*field)->val_int();
1029 /* version */
1030 field++;
1031 s->version= ((Field_long *)*field)->val_int();
1032 /* type */
1033 field++;
1034 s->type= ((Field_long *)*field)->val_int();
1035 /* free blobs buffer */
1036 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1037 dbug_tmp_restore_column_map(table->read_set, old_map);
1041 helper function to pack a ndb varchar
1043 char *ndb_pack_varchar(const NDBCOL *col, char *buf,
1044 const char *str, int sz)
1046 switch (col->getArrayType())
1048 case NDBCOL::ArrayTypeFixed:
1049 memcpy(buf, str, sz);
1050 break;
1051 case NDBCOL::ArrayTypeShortVar:
1052 *(uchar*)buf= (uchar)sz;
1053 memcpy(buf + 1, str, sz);
1054 break;
1055 case NDBCOL::ArrayTypeMediumVar:
1056 int2store(buf, sz);
1057 memcpy(buf + 2, str, sz);
1058 break;
1060 return buf;
1064 acknowledge handling of schema operation
1066 static int
1067 ndbcluster_update_slock(THD *thd,
1068 const char *db,
1069 const char *table_name)
1071 DBUG_ENTER("ndbcluster_update_slock");
1072 if (!ndb_schema_share)
1074 DBUG_RETURN(0);
1077 const NdbError *ndb_error= 0;
1078 uint32 node_id= g_ndb_cluster_connection->node_id();
1079 Ndb *ndb= check_ndb_in_thd(thd);
1080 char save_db[FN_HEADLEN];
1081 strcpy(save_db, ndb->getDatabaseName());
1083 char tmp_buf[FN_REFLEN];
1084 NDBDICT *dict= ndb->getDictionary();
1085 ndb->setDatabaseName(NDB_REP_DB);
1086 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1087 const NDBTAB *ndbtab= ndbtab_g.get_table();
1088 NdbTransaction *trans= 0;
1089 int retries= 100;
1090 int retry_sleep= 10; /* 10 milliseconds, transaction */
1091 const NDBCOL *col[SCHEMA_SIZE];
1092 unsigned sz[SCHEMA_SIZE];
1094 MY_BITMAP slock;
1095 uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1096 bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
1098 if (ndbtab == 0)
1100 abort();
1101 DBUG_RETURN(0);
1105 uint i;
1106 for (i= 0; i < SCHEMA_SIZE; i++)
1108 col[i]= ndbtab->getColumn(i);
1109 if (i != SCHEMA_QUERY_I)
1111 sz[i]= col[i]->getLength();
1112 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1117 while (1)
1119 if ((trans= ndb->startTransaction()) == 0)
1120 goto err;
1122 NdbOperation *op= 0;
1123 int r= 0;
1125 /* read the bitmap exlusive */
1126 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1127 DBUG_ASSERT(r == 0);
1128 r|= op->readTupleExclusive();
1129 DBUG_ASSERT(r == 0);
1131 /* db */
1132 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1133 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1134 DBUG_ASSERT(r == 0);
1135 /* name */
1136 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1137 strlen(table_name));
1138 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1139 DBUG_ASSERT(r == 0);
1140 /* slock */
1141 r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
1142 DBUG_ASSERT(r == 0);
1144 if (trans->execute(NdbTransaction::NoCommit))
1145 goto err;
1146 bitmap_clear_bit(&slock, node_id);
1148 NdbOperation *op= 0;
1149 int r= 0;
1151 /* now update the tuple */
1152 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1153 DBUG_ASSERT(r == 0);
1154 r|= op->updateTuple();
1155 DBUG_ASSERT(r == 0);
1157 /* db */
1158 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1159 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1160 DBUG_ASSERT(r == 0);
1161 /* name */
1162 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1163 strlen(table_name));
1164 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1165 DBUG_ASSERT(r == 0);
1166 /* slock */
1167 r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
1168 DBUG_ASSERT(r == 0);
1169 /* node_id */
1170 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1171 DBUG_ASSERT(r == 0);
1172 /* type */
1173 r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1174 DBUG_ASSERT(r == 0);
1176 if (trans->execute(NdbTransaction::Commit) == 0)
1178 dict->forceGCPWait();
1179 DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1180 node_id, db, table_name));
1181 break;
1183 err:
1184 const NdbError *this_error= trans ?
1185 &trans->getNdbError() : &ndb->getNdbError();
1186 if (this_error->status == NdbError::TemporaryError)
1188 if (retries--)
1190 if (trans)
1191 ndb->closeTransaction(trans);
1192 my_sleep(retry_sleep);
1193 continue; // retry
1196 ndb_error= this_error;
1197 break;
1200 if (ndb_error)
1202 char buf[1024];
1203 my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
1204 db, table_name);
1205 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
1206 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1207 ndb_error->code, ndb_error->message, buf);
1209 if (trans)
1210 ndb->closeTransaction(trans);
1211 ndb->setDatabaseName(save_db);
1212 DBUG_RETURN(0);
1216 log query in schema table
1218 static void ndb_report_waiting(const char *key,
1219 int the_time,
1220 const char *op,
1221 const char *obj)
1223 ulonglong ndb_latest_epoch= 0;
1224 const char *proc_info= "<no info>";
1225 pthread_mutex_lock(&injector_mutex);
1226 if (injector_ndb)
1227 ndb_latest_epoch= injector_ndb->getLatestGCI();
1228 if (injector_thd)
1229 proc_info= injector_thd->proc_info;
1230 pthread_mutex_unlock(&injector_mutex);
1231 sql_print_information("NDB %s:"
1232 " waiting max %u sec for %s %s."
1233 " epochs: (%u,%u,%u)"
1234 " injector proc_info: %s"
1235 ,key, the_time, op, obj
1236 ,(uint)ndb_latest_handled_binlog_epoch
1237 ,(uint)ndb_latest_received_binlog_epoch
1238 ,(uint)ndb_latest_epoch
1239 ,proc_info
1243 int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
1244 const char *query, int query_length,
1245 const char *db, const char *table_name,
1246 uint32 ndb_table_id,
1247 uint32 ndb_table_version,
1248 enum SCHEMA_OP_TYPE type,
1249 const char *new_db, const char *new_table_name,
1250 int have_lock_open)
1252 DBUG_ENTER("ndbcluster_log_schema_op");
1253 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1254 if (!thd_ndb)
1256 if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
1258 sql_print_error("Could not allocate Thd_ndb object");
1259 DBUG_RETURN(1);
1261 set_thd_ndb(thd, thd_ndb);
1264 DBUG_PRINT("enter",
1265 ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1266 query, db, table_name, thd_ndb->options));
1267 if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1269 DBUG_RETURN(0);
1272 char tmp_buf2[FN_REFLEN];
1273 char quoted_table1[2 + 2 * FN_REFLEN + 1];
1274 char quoted_db1[2 + 2 * FN_REFLEN + 1];
1275 char quoted_db2[2 + 2 * FN_REFLEN + 1];
1276 char quoted_table2[2 + 2 * FN_REFLEN + 1];
1277 int id_length= 0;
1278 const char *type_str;
1279 switch (type)
1281 case SOT_DROP_TABLE:
1282 /* drop database command, do not log at drop table */
1283 if (thd->lex->sql_command == SQLCOM_DROP_DB)
1284 DBUG_RETURN(0);
1285 /* redo the drop table query as is may contain several tables */
1286 query= tmp_buf2;
1287 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1288 table_name, 0);
1289 quoted_table1[id_length]= '\0';
1290 query_length= (uint) (strxmov(tmp_buf2, "drop table ",
1291 quoted_table1, NullS) - tmp_buf2);
1292 type_str= "drop table";
1293 break;
1294 case SOT_RENAME_TABLE:
1295 /* redo the rename table query as is may contain several tables */
1296 query= tmp_buf2;
1297 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1298 db, 0);
1299 quoted_db1[id_length]= '\0';
1300 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1301 table_name, 0);
1302 quoted_table1[id_length]= '\0';
1303 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1304 new_db, 0);
1305 quoted_db2[id_length]= '\0';
1306 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1307 new_table_name, 0);
1308 quoted_table2[id_length]= '\0';
1309 query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1310 quoted_db1, ".", quoted_table1, " to ",
1311 quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1312 type_str= "rename table";
1313 break;
1314 case SOT_CREATE_TABLE:
1315 type_str= "create table";
1316 break;
1317 case SOT_ALTER_TABLE:
1318 type_str= "alter table";
1319 break;
1320 case SOT_DROP_DB:
1321 type_str= "drop db";
1322 break;
1323 case SOT_CREATE_DB:
1324 type_str= "create db";
1325 break;
1326 case SOT_ALTER_DB:
1327 type_str= "alter db";
1328 break;
1329 case SOT_TABLESPACE:
1330 type_str= "tablespace";
1331 break;
1332 case SOT_LOGFILE_GROUP:
1333 type_str= "logfile group";
1334 break;
1335 case SOT_TRUNCATE_TABLE:
1336 type_str= "truncate table";
1337 break;
1338 default:
1339 abort(); /* should not happen, programming error */
1342 NDB_SCHEMA_OBJECT *ndb_schema_object;
1344 char key[FN_REFLEN + 1];
1345 build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1346 ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1349 const NdbError *ndb_error= 0;
1350 uint32 node_id= g_ndb_cluster_connection->node_id();
1351 Uint64 epoch= 0;
1352 MY_BITMAP schema_subscribers;
1353 uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
1354 char bitbuf_e[sizeof(bitbuf)];
1355 bzero(bitbuf_e, sizeof(bitbuf_e));
1357 int i, updated= 0;
1358 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1359 bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
1360 bitmap_set_all(&schema_subscribers);
1362 /* begin protect ndb_schema_share */
1363 pthread_mutex_lock(&ndb_schema_share_mutex);
1364 if (ndb_schema_share == 0)
1366 pthread_mutex_unlock(&ndb_schema_share_mutex);
1367 if (ndb_schema_object)
1368 ndb_free_schema_object(&ndb_schema_object, FALSE);
1369 DBUG_RETURN(0);
1371 (void) pthread_mutex_lock(&ndb_schema_share->mutex);
1372 for (i= 0; i < no_storage_nodes; i++)
1374 MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
1375 if (!bitmap_is_clear_all(table_subscribers))
1377 bitmap_intersect(&schema_subscribers,
1378 table_subscribers);
1379 updated= 1;
1382 (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
1383 pthread_mutex_unlock(&ndb_schema_share_mutex);
1384 /* end protect ndb_schema_share */
1386 if (updated)
1388 bitmap_clear_bit(&schema_subscribers, node_id);
1390 if setting own acknowledge bit it is important that
1391 no other mysqld's are registred, as subsequent code
1392 will cause the original event to be hidden (by blob
1393 merge event code)
1395 if (bitmap_is_clear_all(&schema_subscribers))
1396 bitmap_set_bit(&schema_subscribers, node_id);
1398 else
1399 bitmap_clear_all(&schema_subscribers);
1401 if (ndb_schema_object)
1403 (void) pthread_mutex_lock(&ndb_schema_object->mutex);
1404 memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
1405 sizeof(ndb_schema_object->slock));
1406 (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
1409 DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
1410 no_bytes_in_map(&schema_subscribers));
1411 DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
1412 bitmap_is_clear_all(&schema_subscribers)));
1415 Ndb *ndb= thd_ndb->ndb;
1416 char save_db[FN_REFLEN];
1417 strcpy(save_db, ndb->getDatabaseName());
1419 char tmp_buf[FN_REFLEN];
1420 NDBDICT *dict= ndb->getDictionary();
1421 ndb->setDatabaseName(NDB_REP_DB);
1422 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1423 const NDBTAB *ndbtab= ndbtab_g.get_table();
1424 NdbTransaction *trans= 0;
1425 int retries= 100;
1426 int retry_sleep= 10; /* 10 milliseconds, transaction */
1427 const NDBCOL *col[SCHEMA_SIZE];
1428 unsigned sz[SCHEMA_SIZE];
1430 if (ndbtab == 0)
1432 if (strcmp(NDB_REP_DB, db) != 0 ||
1433 strcmp(NDB_SCHEMA_TABLE, table_name))
1435 ndb_error= &dict->getNdbError();
1437 goto end;
1441 uint i;
1442 for (i= 0; i < SCHEMA_SIZE; i++)
1444 col[i]= ndbtab->getColumn(i);
1445 if (i != SCHEMA_QUERY_I)
1447 sz[i]= col[i]->getLength();
1448 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1453 while (1)
1455 const char *log_db= db;
1456 const char *log_tab= table_name;
1457 const char *log_subscribers= (char*)schema_subscribers.bitmap;
1458 uint32 log_type= (uint32)type;
1459 if ((trans= ndb->startTransaction()) == 0)
1460 goto err;
1461 while (1)
1463 NdbOperation *op= 0;
1464 int r= 0;
1465 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1466 DBUG_ASSERT(r == 0);
1467 r|= op->writeTuple();
1468 DBUG_ASSERT(r == 0);
1470 /* db */
1471 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
1472 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1473 DBUG_ASSERT(r == 0);
1474 /* name */
1475 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
1476 strlen(log_tab));
1477 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1478 DBUG_ASSERT(r == 0);
1479 /* slock */
1480 DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
1481 r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
1482 DBUG_ASSERT(r == 0);
1483 /* query */
1485 NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
1486 DBUG_ASSERT(ndb_blob != 0);
1487 uint blob_len= query_length;
1488 const char* blob_ptr= query;
1489 r|= ndb_blob->setValue(blob_ptr, blob_len);
1490 DBUG_ASSERT(r == 0);
1492 /* node_id */
1493 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1494 DBUG_ASSERT(r == 0);
1495 /* epoch */
1496 r|= op->setValue(SCHEMA_EPOCH_I, epoch);
1497 DBUG_ASSERT(r == 0);
1498 /* id */
1499 r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
1500 DBUG_ASSERT(r == 0);
1501 /* version */
1502 r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
1503 DBUG_ASSERT(r == 0);
1504 /* type */
1505 r|= op->setValue(SCHEMA_TYPE_I, log_type);
1506 DBUG_ASSERT(r == 0);
1507 /* any value */
1508 if (!(thd->options & OPTION_BIN_LOG))
1509 r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
1510 else
1511 r|= op->setAnyValue(thd->server_id);
1512 DBUG_ASSERT(r == 0);
1513 if (log_db != new_db && new_db && new_table_name)
1515 log_db= new_db;
1516 log_tab= new_table_name;
1517 log_subscribers= bitbuf_e; // no ack expected on this
1518 log_type= (uint32)SOT_RENAME_TABLE_NEW;
1519 continue;
1521 break;
1523 if (trans->execute(NdbTransaction::Commit) == 0)
1525 DBUG_PRINT("info", ("logged: %s", query));
1526 break;
1528 err:
1529 const NdbError *this_error= trans ?
1530 &trans->getNdbError() : &ndb->getNdbError();
1531 if (this_error->status == NdbError::TemporaryError)
1533 if (retries--)
1535 if (trans)
1536 ndb->closeTransaction(trans);
1537 my_sleep(retry_sleep);
1538 continue; // retry
1541 ndb_error= this_error;
1542 break;
1544 end:
1545 if (ndb_error)
1546 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
1547 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1548 ndb_error->code,
1549 ndb_error->message,
1550 "Could not log query '%s' on other mysqld's");
1552 if (trans)
1553 ndb->closeTransaction(trans);
1554 ndb->setDatabaseName(save_db);
1557 Wait for other mysqld's to acknowledge the table operation
1559 if (ndb_error == 0 &&
1560 !bitmap_is_clear_all(&schema_subscribers))
1563 if own nodeid is set we are a single mysqld registred
1564 as an optimization we update the slock directly
1566 if (bitmap_is_set(&schema_subscribers, node_id))
1567 ndbcluster_update_slock(thd, db, table_name);
1568 else
1569 dict->forceGCPWait();
1571 int max_timeout= opt_ndb_sync_timeout;
1572 (void) pthread_mutex_lock(&ndb_schema_object->mutex);
1573 if (have_lock_open)
1575 safe_mutex_assert_owner(&LOCK_open);
1576 (void) pthread_mutex_unlock(&LOCK_open);
1578 while (1)
1580 struct timespec abstime;
1581 int i;
1582 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1583 set_timespec(abstime, 1);
1584 int ret= pthread_cond_timedwait(&injector_cond,
1585 &ndb_schema_object->mutex,
1586 &abstime);
1587 if (thd->killed)
1588 break;
1590 /* begin protect ndb_schema_share */
1591 pthread_mutex_lock(&ndb_schema_share_mutex);
1592 if (ndb_schema_share == 0)
1594 pthread_mutex_unlock(&ndb_schema_share_mutex);
1595 break;
1597 (void) pthread_mutex_lock(&ndb_schema_share->mutex);
1598 for (i= 0; i < no_storage_nodes; i++)
1600 /* remove any unsubscribed from schema_subscribers */
1601 MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
1602 if (!bitmap_is_clear_all(tmp))
1603 bitmap_intersect(&schema_subscribers, tmp);
1605 (void) pthread_mutex_unlock(&ndb_schema_share->mutex);
1606 pthread_mutex_unlock(&ndb_schema_share_mutex);
1607 /* end protect ndb_schema_share */
1609 /* remove any unsubscribed from ndb_schema_object->slock */
1610 bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
1612 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
1613 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
1614 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1616 if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
1617 break;
1619 if (ret)
1621 max_timeout--;
1622 if (max_timeout == 0)
1624 sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
1625 type_str, ndb_schema_object->key);
1626 break;
1628 if (ndb_extra_logging)
1629 ndb_report_waiting(type_str, max_timeout,
1630 "distributing", ndb_schema_object->key);
1633 if (have_lock_open)
1635 (void) pthread_mutex_lock(&LOCK_open);
1637 (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
1640 if (ndb_schema_object)
1641 ndb_free_schema_object(&ndb_schema_object, FALSE);
1643 DBUG_RETURN(0);
1647 Handle _non_ data events from the storage nodes
1650 ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
1651 NDB_SHARE *share)
1653 DBUG_ENTER("ndb_handle_schema_change");
1654 TABLE* table= share->table;
1655 TABLE_SHARE *table_share= share->table_share;
1656 const char *dbname= table_share->db.str;
1657 const char *tabname= table_share->table_name.str;
1658 bool do_close_cached_tables= FALSE;
1659 bool is_online_alter_table= FALSE;
1660 bool is_rename_table= FALSE;
1661 bool is_remote_change=
1662 (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
1664 if (pOp->getEventType() == NDBEVENT::TE_ALTER)
1666 if (pOp->tableFrmChanged())
1668 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
1669 is_online_alter_table= TRUE;
1671 else
1673 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
1674 DBUG_ASSERT(pOp->tableNameChanged());
1675 is_rename_table= TRUE;
1680 ndb->setDatabaseName(dbname);
1681 Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
1682 const NDBTAB *ev_tab= pOp->getTable();
1683 const NDBTAB *cache_tab= ndbtab_g.get_table();
1684 if (cache_tab &&
1685 cache_tab->getObjectId() == ev_tab->getObjectId() &&
1686 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1687 ndbtab_g.invalidate();
1691 Refresh local frm file and dictionary cache if
1692 remote on-line alter table
1694 if (is_remote_change && is_online_alter_table)
1696 const char *tabname= table_share->table_name.str;
1697 char key[FN_REFLEN + 1];
1698 uchar *data= 0, *pack_data= 0;
1699 size_t length, pack_length;
1700 int error;
1701 NDBDICT *dict= ndb->getDictionary();
1702 const NDBTAB *altered_table= pOp->getTable();
1704 DBUG_PRINT("info", ("Detected frm change of table %s.%s",
1705 dbname, tabname));
1706 build_table_filename(key, FN_LEN - 1, dbname, tabname, NullS, 0);
1708 If the there is no local table shadowing the altered table and
1709 it has an frm that is different than the one on disk then
1710 overwrite it with the new table definition
1712 if (!ndbcluster_check_if_local_table(dbname, tabname) &&
1713 readfrm(key, &data, &length) == 0 &&
1714 packfrm(data, length, &pack_data, &pack_length) == 0 &&
1715 cmp_frm(altered_table, pack_data, pack_length))
1717 DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(),
1718 altered_table->getFrmLength());
1719 pthread_mutex_lock(&LOCK_open);
1720 Ndb_table_guard ndbtab_g(dict, tabname);
1721 const NDBTAB *old= ndbtab_g.get_table();
1722 if (!old &&
1723 old->getObjectVersion() != altered_table->getObjectVersion())
1724 dict->putTable(altered_table);
1726 my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
1727 data= NULL;
1728 if ((error= unpackfrm(&data, &length,
1729 (const uchar*) altered_table->getFrmData())) ||
1730 (error= writefrm(key, data, length)))
1732 sql_print_information("NDB: Failed write frm for %s.%s, error %d",
1733 dbname, tabname, error);
1736 // copy names as memory will be freed
1737 NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
1738 NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
1739 ndbcluster_binlog_close_table(thd, share);
1741 TABLE_LIST table_list;
1742 bzero((char*) &table_list,sizeof(table_list));
1743 table_list.db= (char *)dbname;
1744 table_list.alias= table_list.table_name= (char *)tabname;
1745 close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
1747 if ((error= ndbcluster_binlog_open_table(thd, share,
1748 table_share, table, 1)))
1749 sql_print_information("NDB: Failed to re-open table %s.%s",
1750 dbname, tabname);
1752 table= share->table;
1753 table_share= share->table_share;
1754 dbname= table_share->db.str;
1755 tabname= table_share->table_name.str;
1757 pthread_mutex_unlock(&LOCK_open);
1759 my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
1760 my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
1763 // If only frm was changed continue replicating
1764 if (is_online_alter_table)
1766 /* Signal ha_ndbcluster::alter_table that drop is done */
1767 (void) pthread_cond_signal(&injector_cond);
1768 DBUG_RETURN(0);
1771 (void) pthread_mutex_lock(&share->mutex);
1772 if (is_rename_table && !is_remote_change)
1774 DBUG_PRINT("info", ("Detected name change of table %s.%s",
1775 share->db, share->table_name));
1776 /* ToDo: remove printout */
1777 if (ndb_extra_logging)
1778 sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
1779 share_prefix, share->table->s->db.str,
1780 share->table->s->table_name.str,
1781 share->key);
1783 ndb->setDatabaseName(share->table->s->db.str);
1784 Ndb_table_guard ndbtab_g(ndb->getDictionary(),
1785 share->table->s->table_name.str);
1786 const NDBTAB *ev_tab= pOp->getTable();
1787 const NDBTAB *cache_tab= ndbtab_g.get_table();
1788 if (cache_tab &&
1789 cache_tab->getObjectId() == ev_tab->getObjectId() &&
1790 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1791 ndbtab_g.invalidate();
1793 /* do the rename of the table in the share */
1794 share->table->s->db.str= share->db;
1795 share->table->s->db.length= strlen(share->db);
1796 share->table->s->table_name.str= share->table_name;
1797 share->table->s->table_name.length= strlen(share->table_name);
1799 DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
1800 if (share->op_old == pOp)
1801 share->op_old= 0;
1802 else
1803 share->op= 0;
1804 // either just us or drop table handling as well
1806 /* Signal ha_ndbcluster::delete/rename_table that drop is done */
1807 (void) pthread_mutex_unlock(&share->mutex);
1808 (void) pthread_cond_signal(&injector_cond);
1810 pthread_mutex_lock(&ndbcluster_mutex);
1811 /* ndb_share reference binlog free */
1812 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
1813 share->key, share->use_count));
1814 free_share(&share, TRUE);
1815 if (is_remote_change && share && share->state != NSS_DROPPED)
1817 DBUG_PRINT("info", ("remote change"));
1818 share->state= NSS_DROPPED;
1819 if (share->use_count != 1)
1821 /* open handler holding reference */
1822 /* wait with freeing create ndb_share to below */
1823 do_close_cached_tables= TRUE;
1825 else
1827 /* ndb_share reference create free */
1828 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1829 share->key, share->use_count));
1830 free_share(&share, TRUE);
1831 share= 0;
1834 else
1835 share= 0;
1836 pthread_mutex_unlock(&ndbcluster_mutex);
1838 pOp->setCustomData(0);
1840 pthread_mutex_lock(&injector_mutex);
1841 ndb->dropEventOperation(pOp);
1842 pOp= 0;
1843 pthread_mutex_unlock(&injector_mutex);
1845 if (do_close_cached_tables)
1847 TABLE_LIST table_list;
1848 bzero((char*) &table_list,sizeof(table_list));
1849 table_list.db= (char *)dbname;
1850 table_list.alias= table_list.table_name= (char *)tabname;
1851 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
1852 /* ndb_share reference create free */
1853 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1854 share->key, share->use_count));
1855 free_share(&share);
1857 DBUG_RETURN(0);
1860 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
1862 if (schema->any_value & NDB_ANYVALUE_RESERVED)
1864 if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
1865 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
1866 "query not logged",
1867 schema->any_value);
1868 return;
1870 uint32 thd_server_id_save= thd->server_id;
1871 DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
1872 char *thd_db_save= thd->db;
1873 if (schema->any_value == 0)
1874 thd->server_id= ::server_id;
1875 else
1876 thd->server_id= schema->any_value;
1877 thd->db= schema->db;
1878 int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
1879 thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
1880 schema->query_length, FALSE,
1881 schema->name[0] == 0 || thd->db[0] == 0,
1882 errcode);
1883 thd->server_id= thd_server_id_save;
1884 thd->db= thd_db_save;
1887 static int
1888 ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
1889 NdbEventOperation *pOp,
1890 List<Cluster_schema>
1891 *post_epoch_log_list,
1892 List<Cluster_schema>
1893 *post_epoch_unlock_list,
1894 MEM_ROOT *mem_root)
1896 DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
1897 NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData();
1898 if (tmp_share && ndb_schema_share == tmp_share)
1900 NDBEVENT::TableEvent ev_type= pOp->getEventType();
1901 DBUG_PRINT("enter", ("%s.%s ev_type: %d",
1902 tmp_share->db, tmp_share->table_name, ev_type));
1903 if (ev_type == NDBEVENT::TE_UPDATE ||
1904 ev_type == NDBEVENT::TE_INSERT)
1906 Cluster_schema *schema= (Cluster_schema *)
1907 sql_alloc(sizeof(Cluster_schema));
1908 MY_BITMAP slock;
1909 bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
1910 uint node_id= g_ndb_cluster_connection->node_id();
1912 ndbcluster_get_schema(tmp_share, schema);
1913 schema->any_value= pOp->getAnyValue();
1915 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
1916 DBUG_PRINT("info",
1917 ("%s.%s: log query_length: %d query: '%s' type: %d",
1918 schema->db, schema->name,
1919 schema->query_length, schema->query,
1920 schema_type));
1921 if (schema_type == SOT_CLEAR_SLOCK)
1924 handle slock after epoch is completed to ensure that
1925 schema events get inserted in the binlog after any data
1926 events
1928 post_epoch_log_list->push_back(schema, mem_root);
1929 DBUG_RETURN(0);
1931 if (schema->node_id != node_id)
1933 int log_query= 0, post_epoch_unlock= 0;
1934 switch (schema_type)
1936 case SOT_DROP_TABLE:
1937 // fall through
1938 case SOT_RENAME_TABLE:
1939 // fall through
1940 case SOT_RENAME_TABLE_NEW:
1941 // fall through
1942 case SOT_ALTER_TABLE:
1943 post_epoch_log_list->push_back(schema, mem_root);
1944 /* acknowledge this query _after_ epoch completion */
1945 post_epoch_unlock= 1;
1946 break;
1947 case SOT_TRUNCATE_TABLE:
1949 char key[FN_REFLEN + 1];
1950 build_table_filename(key, sizeof(key) - 1,
1951 schema->db, schema->name, "", 0);
1952 /* ndb_share reference temporary, free below */
1953 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
1954 if (share)
1956 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
1957 share->key, share->use_count));
1959 // invalidation already handled by binlog thread
1960 if (!share || !share->op)
1963 injector_ndb->setDatabaseName(schema->db);
1964 Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
1965 schema->name);
1966 ndbtab_g.invalidate();
1968 TABLE_LIST table_list;
1969 bzero((char*) &table_list,sizeof(table_list));
1970 table_list.db= schema->db;
1971 table_list.alias= table_list.table_name= schema->name;
1972 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
1974 /* ndb_share reference temporary free */
1975 if (share)
1977 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
1978 share->key, share->use_count));
1979 free_share(&share);
1982 // fall through
1983 case SOT_CREATE_TABLE:
1984 pthread_mutex_lock(&LOCK_open);
1985 if (ndbcluster_check_if_local_table(schema->db, schema->name))
1987 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
1988 schema->db, schema->name));
1989 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
1990 "binlog schema event '%s' from node %d. ",
1991 schema->db, schema->name, schema->query,
1992 schema->node_id);
1994 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
1996 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
1997 "binlog schema event '%s' from node %d. "
1998 "my_errno: %d",
1999 schema->db, schema->name, schema->query,
2000 schema->node_id, my_errno);
2001 List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
2002 MYSQL_ERROR *err;
2003 while ((err= it++))
2004 sql_print_warning("NDB Binlog: (%d)%s", err->code, err->msg);
2006 pthread_mutex_unlock(&LOCK_open);
2007 log_query= 1;
2008 break;
2009 case SOT_DROP_DB:
2010 /* Drop the database locally if it only contains ndb tables */
2011 if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2013 const int no_print_error[1]= {0};
2014 run_query(thd, schema->query,
2015 schema->query + schema->query_length,
2016 no_print_error, /* print error */
2017 TRUE); /* don't binlog the query */
2018 /* binlog dropping database after any table operations */
2019 post_epoch_log_list->push_back(schema, mem_root);
2020 /* acknowledge this query _after_ epoch completion */
2021 post_epoch_unlock= 1;
2023 else
2025 /* Database contained local tables, leave it */
2026 sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2027 "binlog schema event '%s' from node %d. ",
2028 schema->db, schema->query,
2029 schema->node_id);
2030 log_query= 1;
2032 break;
2033 case SOT_CREATE_DB:
2034 /* fall through */
2035 case SOT_ALTER_DB:
2037 const int no_print_error[1]= {0};
2038 run_query(thd, schema->query,
2039 schema->query + schema->query_length,
2040 no_print_error, /* print error */
2041 TRUE); /* don't binlog the query */
2042 log_query= 1;
2043 break;
2045 case SOT_TABLESPACE:
2046 case SOT_LOGFILE_GROUP:
2047 log_query= 1;
2048 break;
2049 case SOT_CLEAR_SLOCK:
2050 abort();
2052 if (log_query && ndb_binlog_running)
2053 ndb_binlog_query(thd, schema);
2054 /* signal that schema operation has been handled */
2055 DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2056 if (bitmap_is_set(&slock, node_id))
2058 if (post_epoch_unlock)
2059 post_epoch_unlock_list->push_back(schema, mem_root);
2060 else
2061 ndbcluster_update_slock(thd, schema->db, schema->name);
2064 DBUG_RETURN(0);
2067 the normal case of UPDATE/INSERT has already been handled
2069 switch (ev_type)
2071 case NDBEVENT::TE_DELETE:
2072 // skip
2073 break;
2074 case NDBEVENT::TE_CLUSTER_FAILURE:
2075 if (ndb_extra_logging)
2076 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
2077 ndb_schema_share->key, (unsigned) pOp->getGCI());
2078 // fall through
2079 case NDBEVENT::TE_DROP:
2080 if (ndb_extra_logging &&
2081 ndb_binlog_tables_inited && ndb_binlog_running)
2082 sql_print_information("NDB Binlog: ndb tables initially "
2083 "read only on reconnect.");
2085 /* begin protect ndb_schema_share */
2086 pthread_mutex_lock(&ndb_schema_share_mutex);
2087 /* ndb_share reference binlog extra free */
2088 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
2089 ndb_schema_share->key,
2090 ndb_schema_share->use_count));
2091 free_share(&ndb_schema_share);
2092 ndb_schema_share= 0;
2093 ndb_binlog_tables_inited= 0;
2094 pthread_mutex_unlock(&ndb_schema_share_mutex);
2095 /* end protect ndb_schema_share */
2097 close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
2098 // fall through
2099 case NDBEVENT::TE_ALTER:
2100 ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
2101 break;
2102 case NDBEVENT::TE_NODE_FAILURE:
2104 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2105 DBUG_ASSERT(node_id != 0xFF);
2106 (void) pthread_mutex_lock(&tmp_share->mutex);
2107 bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2108 DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2109 if (ndb_extra_logging)
2111 sql_print_information("NDB Binlog: Node: %d, down,"
2112 " Subscriber bitmask %x%x",
2113 pOp->getNdbdNodeId(),
2114 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2115 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2117 (void) pthread_mutex_unlock(&tmp_share->mutex);
2118 (void) pthread_cond_signal(&injector_cond);
2119 break;
2121 case NDBEVENT::TE_SUBSCRIBE:
2123 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2124 uint8 req_id= pOp->getReqNodeId();
2125 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2126 (void) pthread_mutex_lock(&tmp_share->mutex);
2127 bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2128 DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
2129 if (ndb_extra_logging)
2131 sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2132 " Subscriber bitmask %x%x",
2133 pOp->getNdbdNodeId(),
2134 req_id,
2135 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2136 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2138 (void) pthread_mutex_unlock(&tmp_share->mutex);
2139 (void) pthread_cond_signal(&injector_cond);
2140 break;
2142 case NDBEVENT::TE_UNSUBSCRIBE:
2144 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2145 uint8 req_id= pOp->getReqNodeId();
2146 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2147 (void) pthread_mutex_lock(&tmp_share->mutex);
2148 bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2149 DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
2150 if (ndb_extra_logging)
2152 sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2153 " Subscriber bitmask %x%x",
2154 pOp->getNdbdNodeId(),
2155 req_id,
2156 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2157 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2159 (void) pthread_mutex_unlock(&tmp_share->mutex);
2160 (void) pthread_cond_signal(&injector_cond);
2161 break;
2163 default:
2164 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2165 "Ignoring...", (unsigned) ev_type, tmp_share->key);
2168 DBUG_RETURN(0);
2172 process any operations that should be done after
2173 the epoch is complete
2175 static void
2176 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2177 List<Cluster_schema>
2178 *post_epoch_log_list,
2179 List<Cluster_schema>
2180 *post_epoch_unlock_list)
2182 if (post_epoch_log_list->elements == 0)
2183 return;
2184 DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2185 Cluster_schema *schema;
2186 while ((schema= post_epoch_log_list->pop()))
2188 DBUG_PRINT("info",
2189 ("%s.%s: log query_length: %d query: '%s' type: %d",
2190 schema->db, schema->name,
2191 schema->query_length, schema->query,
2192 schema->type));
2193 int log_query= 0;
2195 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2196 char key[FN_REFLEN + 1];
2197 build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2198 if (schema_type == SOT_CLEAR_SLOCK)
2200 pthread_mutex_lock(&ndbcluster_mutex);
2201 NDB_SCHEMA_OBJECT *ndb_schema_object=
2202 (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
2203 (uchar*) key, strlen(key));
2204 if (ndb_schema_object)
2206 pthread_mutex_lock(&ndb_schema_object->mutex);
2207 memcpy(ndb_schema_object->slock, schema->slock,
2208 sizeof(ndb_schema_object->slock));
2209 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2210 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2211 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2212 pthread_mutex_unlock(&ndb_schema_object->mutex);
2213 pthread_cond_signal(&injector_cond);
2215 pthread_mutex_unlock(&ndbcluster_mutex);
2216 continue;
2218 /* ndb_share reference temporary, free below */
2219 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2220 if (share)
2222 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2223 share->key, share->use_count));
2225 switch (schema_type)
2227 case SOT_DROP_DB:
2228 log_query= 1;
2229 break;
2230 case SOT_DROP_TABLE:
2231 log_query= 1;
2232 // invalidation already handled by binlog thread
2233 if (share && share->op)
2235 break;
2237 // fall through
2238 case SOT_RENAME_TABLE:
2239 // fall through
2240 case SOT_ALTER_TABLE:
2241 // invalidation already handled by binlog thread
2242 if (!share || !share->op)
2245 injector_ndb->setDatabaseName(schema->db);
2246 Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
2247 schema->name);
2248 ndbtab_g.invalidate();
2250 TABLE_LIST table_list;
2251 bzero((char*) &table_list,sizeof(table_list));
2252 table_list.db= schema->db;
2253 table_list.alias= table_list.table_name= schema->name;
2254 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2256 if (schema_type != SOT_ALTER_TABLE)
2257 break;
2258 // fall through
2259 case SOT_RENAME_TABLE_NEW:
2260 log_query= 1;
2261 if (ndb_binlog_running && (!share || !share->op))
2264 we need to free any share here as command below
2265 may need to call handle_trailing_share
2267 if (share)
2269 /* ndb_share reference temporary free */
2270 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2271 share->key, share->use_count));
2272 free_share(&share);
2273 share= 0;
2275 pthread_mutex_lock(&LOCK_open);
2276 if (ndbcluster_check_if_local_table(schema->db, schema->name))
2278 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2279 schema->db, schema->name));
2280 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2281 "binlog schema event '%s' from node %d. ",
2282 schema->db, schema->name, schema->query,
2283 schema->node_id);
2285 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2287 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
2288 "binlog schema event '%s' from node %d. my_errno: %d",
2289 schema->db, schema->name, schema->query,
2290 schema->node_id, my_errno);
2291 List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
2292 MYSQL_ERROR *err;
2293 while ((err= it++))
2294 sql_print_warning("NDB Binlog: (%d)%s", err->code, err->msg);
2296 pthread_mutex_unlock(&LOCK_open);
2298 break;
2299 default:
2300 DBUG_ASSERT(FALSE);
2302 if (share)
2304 /* ndb_share reference temporary free */
2305 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2306 share->key, share->use_count));
2307 free_share(&share);
2308 share= 0;
2311 if (ndb_binlog_running && log_query)
2312 ndb_binlog_query(thd, schema);
2314 while ((schema= post_epoch_unlock_list->pop()))
2316 ndbcluster_update_slock(thd, schema->db, schema->name);
2318 DBUG_VOID_RETURN;
2322 Timer class for doing performance measurements
2325 /*********************************************************************
2326 Internal helper functions for handeling of the cluster replication tables
2327 - ndb_binlog_index
2328 - ndb_apply_status
2329 *********************************************************************/
2332 struct to hold the data to be inserted into the
2333 ndb_binlog_index table
2335 struct ndb_binlog_index_row {
2336 ulonglong gci;
2337 const char *master_log_file;
2338 ulonglong master_log_pos;
2339 ulonglong n_inserts;
2340 ulonglong n_updates;
2341 ulonglong n_deletes;
2342 ulonglong n_schemaops;
2346 Open the ndb_binlog_index table
2348 static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
2349 TABLE **ndb_binlog_index)
2351 static char repdb[]= NDB_REP_DB;
2352 static char reptable[]= NDB_REP_TABLE;
2353 const char *save_proc_info= thd->proc_info;
2355 bzero((char*) tables, sizeof(*tables));
2356 tables->db= repdb;
2357 tables->alias= tables->table_name= reptable;
2358 tables->lock_type= TL_WRITE;
2359 thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
2360 tables->required_type= FRMTYPE_TABLE;
2361 uint counter;
2362 thd->clear_error();
2363 if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
2365 if (thd->killed)
2366 sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
2367 else
2368 sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
2369 thd->main_da.sql_errno(),
2370 thd->main_da.message());
2371 thd->proc_info= save_proc_info;
2372 return -1;
2374 *ndb_binlog_index= tables->table;
2375 thd->proc_info= save_proc_info;
2376 (*ndb_binlog_index)->use_all_columns();
2377 return 0;
2382 Insert one row in the ndb_binlog_index
2385 int ndb_add_ndb_binlog_index(THD *thd, void *_row)
2387 ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
2388 int error= 0;
2389 bool need_reopen;
2391 Turn of binlogging to prevent the table changes to be written to
2392 the binary log.
2394 ulong saved_options= thd->options;
2395 thd->options&= ~(OPTION_BIN_LOG);
2397 for ( ; ; ) /* loop for need_reopen */
2399 if (!ndb_binlog_index && open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index))
2401 error= -1;
2402 goto add_ndb_binlog_index_err;
2405 if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
2407 if (need_reopen)
2409 TABLE_LIST *p_binlog_tables= &binlog_tables;
2410 close_tables_for_reopen(thd, &p_binlog_tables);
2411 ndb_binlog_index= 0;
2412 continue;
2414 sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
2415 error= -1;
2416 goto add_ndb_binlog_index_err;
2418 break;
2422 Intialize ndb_binlog_index->record[0]
2424 empty_record(ndb_binlog_index);
2426 ndb_binlog_index->field[0]->store(row.master_log_pos);
2427 ndb_binlog_index->field[1]->store(row.master_log_file,
2428 strlen(row.master_log_file),
2429 &my_charset_bin);
2430 ndb_binlog_index->field[2]->store(row.gci);
2431 ndb_binlog_index->field[3]->store(row.n_inserts);
2432 ndb_binlog_index->field[4]->store(row.n_updates);
2433 ndb_binlog_index->field[5]->store(row.n_deletes);
2434 ndb_binlog_index->field[6]->store(row.n_schemaops);
2436 if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
2438 sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
2439 error= -1;
2440 goto add_ndb_binlog_index_err;
2443 mysql_unlock_tables(thd, thd->lock);
2444 thd->lock= 0;
2445 thd->options= saved_options;
2446 return 0;
2447 add_ndb_binlog_index_err:
2448 close_thread_tables(thd);
2449 ndb_binlog_index= 0;
2450 thd->options= saved_options;
2451 return error;
2454 /*********************************************************************
2455 Functions for start, stop, wait for ndbcluster binlog thread
2456 *********************************************************************/
2458 enum Binlog_thread_state
2460 BCCC_running= 0,
2461 BCCC_exit= 1,
2462 BCCC_restart= 2
2465 static enum Binlog_thread_state do_ndbcluster_binlog_close_connection= BCCC_restart;
2467 int ndbcluster_binlog_start()
2469 DBUG_ENTER("ndbcluster_binlog_start");
2471 if (::server_id == 0)
2473 sql_print_warning("NDB: server id set to zero will cause any other mysqld "
2474 "with bin log to log with wrong server id");
2476 else if (::server_id & 0x1 << 31)
2478 sql_print_error("NDB: server id's with high bit set is reserved for internal "
2479 "purposes");
2480 DBUG_RETURN(-1);
2483 pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
2484 pthread_cond_init(&injector_cond, NULL);
2485 pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
2487 /* Create injector thread */
2488 if (pthread_create(&ndb_binlog_thread, &connection_attrib,
2489 ndb_binlog_thread_func, 0))
2491 DBUG_PRINT("error", ("Could not create ndb injector thread"));
2492 pthread_cond_destroy(&injector_cond);
2493 pthread_mutex_destroy(&injector_mutex);
2494 DBUG_RETURN(-1);
2497 ndbcluster_binlog_inited= 1;
2499 /* Wait for the injector thread to start */
2500 pthread_mutex_lock(&injector_mutex);
2501 while (!ndb_binlog_thread_running)
2502 pthread_cond_wait(&injector_cond, &injector_mutex);
2503 pthread_mutex_unlock(&injector_mutex);
2505 if (ndb_binlog_thread_running < 0)
2506 DBUG_RETURN(-1);
2508 DBUG_RETURN(0);
2512 /**************************************************************
2513 Internal helper functions for creating/dropping ndb events
2514 used by the client sql threads
2515 **************************************************************/
2516 void
2517 ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
2519 event_name->set_ascii("REPL$", 5);
2520 event_name->append(db);
2521 if (tbl)
2523 event_name->append('/');
2524 event_name->append(tbl);
2528 bool
2529 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
2531 char key[FN_REFLEN + 1];
2532 char ndb_file[FN_REFLEN + 1];
2534 DBUG_ENTER("ndbcluster_check_if_local_table");
2535 build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
2536 build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
2537 /* Check that any defined table is an ndb table */
2538 DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
2539 if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
2541 DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
2544 DBUG_RETURN(true);
2547 DBUG_RETURN(false);
2550 bool
2551 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
2553 DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
2554 DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2555 LEX_STRING *tabname;
2556 List<LEX_STRING> files;
2557 char path[FN_REFLEN + 1];
2559 build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2560 if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
2562 DBUG_PRINT("info", ("Failed to find files"));
2563 DBUG_RETURN(true);
2565 DBUG_PRINT("info",("found: %d files", files.elements));
2566 while ((tabname= files.pop()))
2568 DBUG_PRINT("info", ("Found table %s", tabname->str));
2569 if (ndbcluster_check_if_local_table(dbname, tabname->str))
2570 DBUG_RETURN(true);
2573 DBUG_RETURN(false);
2577 Common function for setting up everything for logging a table at
2578 create/discover.
2580 int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
2581 uint key_len,
2582 const char *db,
2583 const char *table_name,
2584 my_bool share_may_exist)
2586 int do_event_op= ndb_binlog_running;
2587 DBUG_ENTER("ndbcluster_create_binlog_setup");
2588 DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
2589 key, key_len, db, table_name, share_may_exist));
2590 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
2591 DBUG_ASSERT(strlen(key) == key_len);
2593 pthread_mutex_lock(&ndbcluster_mutex);
2595 /* Handle any trailing share */
2596 NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
2597 (uchar*) key, key_len);
2599 if (share && share_may_exist)
2601 if (share->flags & NSF_NO_BINLOG ||
2602 share->op != 0 ||
2603 share->op_old != 0)
2605 pthread_mutex_unlock(&ndbcluster_mutex);
2606 DBUG_RETURN(0); // replication already setup, or should not
2610 if (share)
2612 if (share->op || share->op_old)
2614 my_errno= HA_ERR_TABLE_EXIST;
2615 pthread_mutex_unlock(&ndbcluster_mutex);
2616 DBUG_RETURN(1);
2618 if (!share_may_exist || share->connect_count !=
2619 g_ndb_cluster_connection->get_connect_count())
2621 handle_trailing_share(share);
2622 share= NULL;
2626 /* Create share which is needed to hold replication information */
2627 if (share)
2629 /* ndb_share reference create */
2630 ++share->use_count;
2631 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2632 share->key, share->use_count));
2634 /* ndb_share reference create */
2635 else if (!(share= get_share(key, 0, TRUE, TRUE)))
2637 sql_print_error("NDB Binlog: "
2638 "allocating table share for %s failed", key);
2640 else
2642 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2643 share->key, share->use_count));
2646 if (!ndb_schema_share &&
2647 strcmp(share->db, NDB_REP_DB) == 0 &&
2648 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2649 do_event_op= 1;
2650 else if (!ndb_apply_status_share &&
2651 strcmp(share->db, NDB_REP_DB) == 0 &&
2652 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2653 do_event_op= 1;
2655 if (!do_event_op)
2657 share->flags|= NSF_NO_BINLOG;
2658 pthread_mutex_unlock(&ndbcluster_mutex);
2659 DBUG_RETURN(0);
2661 pthread_mutex_unlock(&ndbcluster_mutex);
2663 while (share && !IS_TMP_PREFIX(table_name))
2666 ToDo make sanity check of share so that the table is actually the same
2667 I.e. we need to do open file from frm in this case
2668 Currently awaiting this to be fixed in the 4.1 tree in the general
2669 case
2672 /* Create the event in NDB */
2673 ndb->setDatabaseName(db);
2675 NDBDICT *dict= ndb->getDictionary();
2676 Ndb_table_guard ndbtab_g(dict, table_name);
2677 const NDBTAB *ndbtab= ndbtab_g.get_table();
2678 if (ndbtab == 0)
2680 if (ndb_extra_logging)
2681 sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
2682 "%s, %d", key, dict->getNdbError().message,
2683 dict->getNdbError().code);
2684 break; // error
2686 String event_name(INJECTOR_EVENT_LEN);
2687 ndb_rep_event_name(&event_name, db, table_name);
2689 event should have been created by someone else,
2690 but let's make sure, and create if it doesn't exist
2692 const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
2693 if (!ev)
2695 if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
2697 sql_print_error("NDB Binlog: "
2698 "FAILED CREATE (DISCOVER) TABLE Event: %s",
2699 event_name.c_ptr());
2700 break; // error
2702 if (ndb_extra_logging)
2703 sql_print_information("NDB Binlog: "
2704 "CREATE (DISCOVER) TABLE Event: %s",
2705 event_name.c_ptr());
2707 else
2709 delete ev;
2710 if (ndb_extra_logging)
2711 sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
2712 event_name.c_ptr());
2716 create the event operations for receiving logging events
2718 if (ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
2720 sql_print_error("NDB Binlog:"
2721 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
2722 event_name.c_ptr());
2723 /* a warning has been issued to the client */
2724 DBUG_RETURN(0);
2726 DBUG_RETURN(0);
2728 DBUG_RETURN(-1);
2732 ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
2733 const char *event_name, NDB_SHARE *share,
2734 int push_warning)
2736 THD *thd= current_thd;
2737 DBUG_ENTER("ndbcluster_create_event");
2738 DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
2739 ndbtab->getName(), ndbtab->getObjectVersion(),
2740 event_name, share ? share->key : "(nil)"));
2741 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2742 if (!share)
2744 DBUG_PRINT("info", ("share == NULL"));
2745 DBUG_RETURN(0);
2747 if (share->flags & NSF_NO_BINLOG)
2749 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
2750 share->flags, share->flags & NSF_NO_BINLOG));
2751 DBUG_RETURN(0);
2754 NDBDICT *dict= ndb->getDictionary();
2755 NDBEVENT my_event(event_name);
2756 my_event.setTable(*ndbtab);
2757 my_event.addTableEvent(NDBEVENT::TE_ALL);
2758 if (share->flags & NSF_HIDDEN_PK)
2760 if (share->flags & NSF_BLOB_FLAG)
2762 sql_print_error("NDB Binlog: logging of table %s "
2763 "with BLOB attribute and no PK is not supported",
2764 share->key);
2765 if (push_warning)
2766 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2767 ER_ILLEGAL_HA_CREATE_OPTION,
2768 ER(ER_ILLEGAL_HA_CREATE_OPTION),
2769 ndbcluster_hton_name,
2770 "Binlog of table with BLOB attribute and no PK");
2772 share->flags|= NSF_NO_BINLOG;
2773 DBUG_RETURN(-1);
2775 /* No primary key, subscribe for all attributes */
2776 my_event.setReport(NDBEVENT::ER_ALL);
2777 DBUG_PRINT("info", ("subscription all"));
2779 else
2781 if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
2782 strcmp(share->table_name, NDB_SCHEMA_TABLE))
2784 my_event.setReport(NDBEVENT::ER_UPDATED);
2785 DBUG_PRINT("info", ("subscription only updated"));
2787 else
2789 my_event.setReport((NDBEVENT::EventReport)
2790 (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
2791 DBUG_PRINT("info", ("subscription all and subscribe"));
2794 if (share->flags & NSF_BLOB_FLAG)
2795 my_event.mergeEvents(TRUE);
2797 /* add all columns to the event */
2798 int n_cols= ndbtab->getNoOfColumns();
2799 for(int a= 0; a < n_cols; a++)
2800 my_event.addEventColumn(a);
2802 if (dict->createEvent(my_event)) // Add event to database
2804 if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
2807 failed, print a warning
2809 if (push_warning > 1)
2810 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2811 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2812 dict->getNdbError().code,
2813 dict->getNdbError().message, "NDB");
2814 sql_print_error("NDB Binlog: Unable to create event in database. "
2815 "Event: %s Error Code: %d Message: %s", event_name,
2816 dict->getNdbError().code, dict->getNdbError().message);
2817 DBUG_RETURN(-1);
2821 try retrieving the event, if table version/id matches, we will get
2822 a valid event. Otherwise we have a trailing event from before
2824 const NDBEVENT *ev;
2825 if ((ev= dict->getEvent(event_name)))
2827 delete ev;
2828 DBUG_RETURN(0);
2832 trailing event from before; an error, but try to correct it
2834 if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
2835 dict->dropEvent(my_event.getName()))
2837 if (push_warning > 1)
2838 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2839 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2840 dict->getNdbError().code,
2841 dict->getNdbError().message, "NDB");
2842 sql_print_error("NDB Binlog: Unable to create event in database. "
2843 " Attempt to correct with drop failed. "
2844 "Event: %s Error Code: %d Message: %s",
2845 event_name,
2846 dict->getNdbError().code,
2847 dict->getNdbError().message);
2848 DBUG_RETURN(-1);
2852 try to add the event again
2854 if (dict->createEvent(my_event))
2856 if (push_warning > 1)
2857 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2858 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2859 dict->getNdbError().code,
2860 dict->getNdbError().message, "NDB");
2861 sql_print_error("NDB Binlog: Unable to create event in database. "
2862 " Attempt to correct with drop ok, but create failed. "
2863 "Event: %s Error Code: %d Message: %s",
2864 event_name,
2865 dict->getNdbError().code,
2866 dict->getNdbError().message);
2867 DBUG_RETURN(-1);
2869 #ifdef NDB_BINLOG_EXTRA_WARNINGS
2870 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2871 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2872 0, "NDB Binlog: Removed trailing event",
2873 "NDB");
2874 #endif
2877 DBUG_RETURN(0);
2880 inline int is_ndb_compatible_type(Field *field)
2882 return
2883 !(field->flags & BLOB_FLAG) &&
2884 field->type() != MYSQL_TYPE_BIT &&
2885 field->pack_length() != 0;
2889 - create eventOperations for receiving log events
2890 - setup ndb recattrs for reception of log event data
2891 - "start" the event operation
2893 used at create/discover of tables
2896 ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
2897 const char *event_name)
2899 THD *thd= current_thd;
2901 we are in either create table or rename table so table should be
2902 locked, hence we can work with the share without locks
2905 DBUG_ENTER("ndbcluster_create_event_ops");
2906 DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
2907 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2909 DBUG_ASSERT(share != 0);
2911 if (share->flags & NSF_NO_BINLOG)
2913 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
2914 share->flags));
2915 DBUG_RETURN(0);
2918 int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
2919 if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2920 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2921 do_ndb_schema_share= 1;
2922 else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2923 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2924 do_ndb_apply_status_share= 1;
2925 else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
2927 share->flags|= NSF_NO_BINLOG;
2928 DBUG_RETURN(0);
2931 if (share->op)
2933 assert(share->op->getCustomData() == (void *) share);
2935 DBUG_ASSERT(share->use_count > 1);
2936 sql_print_error("NDB Binlog: discover reusing old ev op");
2937 /* ndb_share reference ToDo free */
2938 DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
2939 share->key, share->use_count));
2940 free_share(&share); // old event op already has reference
2941 DBUG_RETURN(0);
2944 TABLE *table= share->table;
2946 int retries= 100;
2948 100 milliseconds, temporary error on schema operation can
2949 take some time to be resolved
2951 int retry_sleep= 100;
2952 while (1)
2954 pthread_mutex_lock(&injector_mutex);
2955 Ndb *ndb= injector_ndb;
2956 if (do_ndb_schema_share)
2957 ndb= schema_ndb;
2959 if (ndb == 0)
2961 pthread_mutex_unlock(&injector_mutex);
2962 DBUG_RETURN(-1);
2965 NdbEventOperation* op;
2966 if (do_ndb_schema_share)
2967 op= ndb->createEventOperation(event_name);
2968 else
2970 // set injector_ndb database/schema from table internal name
2971 int ret= ndb->setDatabaseAndSchemaName(ndbtab);
2972 assert(ret == 0);
2973 op= ndb->createEventOperation(event_name);
2974 // reset to catch errors
2975 ndb->setDatabaseName("");
2977 if (!op)
2979 sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
2980 " %s",event_name);
2981 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
2982 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2983 ndb->getNdbError().code,
2984 ndb->getNdbError().message,
2985 "NDB");
2986 pthread_mutex_unlock(&injector_mutex);
2987 DBUG_RETURN(-1);
2990 if (share->flags & NSF_BLOB_FLAG)
2991 op->mergeEvents(TRUE); // currently not inherited from event
2993 DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx share->ndb_value[1]: 0x%lx",
2994 (long) share->ndb_value[0],
2995 (long) share->ndb_value[1]));
2996 int n_columns= ndbtab->getNoOfColumns();
2997 int n_fields= table ? table->s->fields : 0; // XXX ???
2998 for (int j= 0; j < n_columns; j++)
3000 const char *col_name= ndbtab->getColumn(j)->getName();
3001 NdbValue attr0, attr1;
3002 if (j < n_fields)
3004 Field *f= share->table->field[j];
3005 if (is_ndb_compatible_type(f))
3007 DBUG_PRINT("info", ("%s compatible", col_name));
3008 attr0.rec= op->getValue(col_name, (char*) f->ptr);
3009 attr1.rec= op->getPreValue(col_name,
3010 (f->ptr - share->table->record[0]) +
3011 (char*) share->table->record[1]);
3013 else if (! (f->flags & BLOB_FLAG))
3015 DBUG_PRINT("info", ("%s non compatible", col_name));
3016 attr0.rec= op->getValue(col_name);
3017 attr1.rec= op->getPreValue(col_name);
3019 else
3021 DBUG_PRINT("info", ("%s blob", col_name));
3022 DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
3023 attr0.blob= op->getBlobHandle(col_name);
3024 attr1.blob= op->getPreBlobHandle(col_name);
3025 if (attr0.blob == NULL || attr1.blob == NULL)
3027 sql_print_error("NDB Binlog: Creating NdbEventOperation"
3028 " blob field %u handles failed (code=%d) for %s",
3029 j, op->getNdbError().code, event_name);
3030 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
3031 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3032 op->getNdbError().code,
3033 op->getNdbError().message,
3034 "NDB");
3035 ndb->dropEventOperation(op);
3036 pthread_mutex_unlock(&injector_mutex);
3037 DBUG_RETURN(-1);
3041 else
3043 DBUG_PRINT("info", ("%s hidden key", col_name));
3044 attr0.rec= op->getValue(col_name);
3045 attr1.rec= op->getPreValue(col_name);
3047 share->ndb_value[0][j].ptr= attr0.ptr;
3048 share->ndb_value[1][j].ptr= attr1.ptr;
3049 DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx "
3050 "share->ndb_value[0][%d]: 0x%lx",
3051 j, (long) &share->ndb_value[0][j],
3052 j, (long) attr0.ptr));
3053 DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx "
3054 "share->ndb_value[1][%d]: 0x%lx",
3055 j, (long) &share->ndb_value[0][j],
3056 j, (long) attr1.ptr));
3058 op->setCustomData((void *) share); // set before execute
3059 share->op= op; // assign op in NDB_SHARE
3060 if (op->execute())
3062 share->op= NULL;
3063 retries--;
3064 if (op->getNdbError().status != NdbError::TemporaryError &&
3065 op->getNdbError().code != 1407)
3066 retries= 0;
3067 if (retries == 0)
3069 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
3070 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3071 op->getNdbError().code, op->getNdbError().message,
3072 "NDB");
3073 sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
3074 event_name,
3075 op->getNdbError().code, op->getNdbError().message);
3077 ndb->dropEventOperation(op);
3078 pthread_mutex_unlock(&injector_mutex);
3079 if (retries)
3081 my_sleep(retry_sleep);
3082 continue;
3084 DBUG_RETURN(-1);
3086 pthread_mutex_unlock(&injector_mutex);
3087 break;
3090 /* ndb_share reference binlog */
3091 get_share(share);
3092 DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
3093 share->key, share->use_count));
3094 if (do_ndb_apply_status_share)
3096 /* ndb_share reference binlog extra */
3097 ndb_apply_status_share= get_share(share);
3098 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3099 share->key, share->use_count));
3100 (void) pthread_cond_signal(&injector_cond);
3102 else if (do_ndb_schema_share)
3104 /* ndb_share reference binlog extra */
3105 ndb_schema_share= get_share(share);
3106 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3107 share->key, share->use_count));
3108 (void) pthread_cond_signal(&injector_cond);
3111 DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
3112 share->key, (long) share->op, share->use_count));
3114 if (ndb_extra_logging)
3115 sql_print_information("NDB Binlog: logging %s", share->key);
3116 DBUG_RETURN(0);
3120 when entering the calling thread should have a share lock id share != 0
3121 then the injector thread will have one as well, i.e. share->use_count == 0
3122 (unless it has already dropped... then share->op == 0)
3125 ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
3126 NDB_SHARE *share, const char *type_str)
3128 DBUG_ENTER("ndbcluster_handle_drop_table");
3129 THD *thd= current_thd;
3131 NDBDICT *dict= ndb->getDictionary();
3132 if (event_name && dict->dropEvent(event_name))
3134 if (dict->getNdbError().code != 4710)
3136 /* drop event failed for some reason, issue a warning */
3137 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
3138 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3139 dict->getNdbError().code,
3140 dict->getNdbError().message, "NDB");
3141 /* error is not that the event did not exist */
3142 sql_print_error("NDB Binlog: Unable to drop event in database. "
3143 "Event: %s Error Code: %d Message: %s",
3144 event_name,
3145 dict->getNdbError().code,
3146 dict->getNdbError().message);
3147 /* ToDo; handle error? */
3148 if (share && share->op &&
3149 share->op->getState() == NdbEventOperation::EO_EXECUTING &&
3150 dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
3152 DBUG_ASSERT(FALSE);
3153 DBUG_RETURN(-1);
3158 if (share == 0 || share->op == 0)
3160 DBUG_RETURN(0);
3164 Syncronized drop between client thread and injector thread is
3165 neccessary in order to maintain ordering in the binlog,
3166 such that the drop occurs _after_ any inserts/updates/deletes.
3168 The penalty for this is that the drop table becomes slow.
3170 This wait is however not strictly neccessary to produce a binlog
3171 that is usable. However the slave does not currently handle
3172 these out of order, thus we are keeping the SYNC_DROP_ defined
3173 for now.
3175 const char *save_proc_info= thd->proc_info;
3176 #define SYNC_DROP_
3177 #ifdef SYNC_DROP_
3178 thd->proc_info= "Syncing ndb table schema operation and binlog";
3179 (void) pthread_mutex_lock(&share->mutex);
3180 safe_mutex_assert_owner(&LOCK_open);
3181 (void) pthread_mutex_unlock(&LOCK_open);
3182 int max_timeout= opt_ndb_sync_timeout;
3183 while (share->op)
3185 struct timespec abstime;
3186 set_timespec(abstime, 1);
3187 int ret= pthread_cond_timedwait(&injector_cond,
3188 &share->mutex,
3189 &abstime);
3190 if (thd->killed ||
3191 share->op == 0)
3192 break;
3193 if (ret)
3195 max_timeout--;
3196 if (max_timeout == 0)
3198 sql_print_error("NDB %s: %s timed out. Ignoring...",
3199 type_str, share->key);
3200 break;
3202 if (ndb_extra_logging)
3203 ndb_report_waiting(type_str, max_timeout,
3204 type_str, share->key);
3207 (void) pthread_mutex_lock(&LOCK_open);
3208 (void) pthread_mutex_unlock(&share->mutex);
3209 #else
3210 (void) pthread_mutex_lock(&share->mutex);
3211 share->op_old= share->op;
3212 share->op= 0;
3213 (void) pthread_mutex_unlock(&share->mutex);
3214 #endif
3215 thd->proc_info= save_proc_info;
3217 DBUG_RETURN(0);
3221 /********************************************************************
3222 Internal helper functions for differentd events from the stoarage nodes
3223 used by the ndb injector thread
3224 ********************************************************************/
3227 Handle error states on events from the storage nodes
3229 static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
3230 ndb_binlog_index_row &row)
3232 NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3233 DBUG_ENTER("ndb_binlog_thread_handle_error");
3235 int overrun= pOp->isOverrun();
3236 if (overrun)
3239 ToDo: this error should rather clear the ndb_binlog_index...
3240 and continue
3242 sql_print_error("NDB Binlog: Overrun in event buffer, "
3243 "this means we have dropped events. Cannot "
3244 "continue binlog for %s", share->key);
3245 pOp->clearError();
3246 DBUG_RETURN(-1);
3249 if (!pOp->isConsistent())
3252 ToDo: this error should rather clear the ndb_binlog_index...
3253 and continue
3255 sql_print_error("NDB Binlog: Not Consistent. Cannot "
3256 "continue binlog for %s. Error code: %d"
3257 " Message: %s", share->key,
3258 pOp->getNdbError().code,
3259 pOp->getNdbError().message);
3260 pOp->clearError();
3261 DBUG_RETURN(-1);
3263 sql_print_error("NDB Binlog: unhandled error %d for table %s",
3264 pOp->hasError(), share->key);
3265 pOp->clearError();
3266 DBUG_RETURN(0);
3269 static int
3270 ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
3271 NdbEventOperation *pOp,
3272 ndb_binlog_index_row &row)
3274 NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3275 NDBEVENT::TableEvent type= pOp->getEventType();
3277 switch (type)
3279 case NDBEVENT::TE_CLUSTER_FAILURE:
3280 if (ndb_extra_logging)
3281 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
3282 share->key, (unsigned) pOp->getGCI());
3283 if (ndb_apply_status_share == share)
3285 if (ndb_extra_logging &&
3286 ndb_binlog_tables_inited && ndb_binlog_running)
3287 sql_print_information("NDB Binlog: ndb tables initially "
3288 "read only on reconnect.");
3289 /* ndb_share reference binlog extra free */
3290 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3291 share->key, share->use_count));
3292 free_share(&ndb_apply_status_share);
3293 ndb_apply_status_share= 0;
3294 ndb_binlog_tables_inited= 0;
3296 DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
3297 "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
3298 "op_old: 0x%lx",
3299 share->key, (long) share, (long) pOp,
3300 (long) share->op, (long) share->op_old));
3301 break;
3302 case NDBEVENT::TE_DROP:
3303 if (ndb_apply_status_share == share)
3305 if (ndb_extra_logging &&
3306 ndb_binlog_tables_inited && ndb_binlog_running)
3307 sql_print_information("NDB Binlog: ndb tables initially "
3308 "read only on reconnect.");
3309 /* ndb_share reference binlog extra free */
3310 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3311 share->key, share->use_count));
3312 free_share(&ndb_apply_status_share);
3313 ndb_apply_status_share= 0;
3314 ndb_binlog_tables_inited= 0;
3316 /* ToDo: remove printout */
3317 if (ndb_extra_logging)
3318 sql_print_information("NDB Binlog: drop table %s.", share->key);
3319 // fall through
3320 case NDBEVENT::TE_ALTER:
3321 row.n_schemaops++;
3322 DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
3323 "share op: 0x%lx op_old: 0x%lx",
3324 type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
3325 share->key, (long) share, (long) pOp,
3326 (long) share->op, (long) share->op_old));
3327 break;
3328 case NDBEVENT::TE_NODE_FAILURE:
3329 /* fall through */
3330 case NDBEVENT::TE_SUBSCRIBE:
3331 /* fall through */
3332 case NDBEVENT::TE_UNSUBSCRIBE:
3333 /* ignore */
3334 return 0;
3335 default:
3336 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
3337 "Ignoring...", (unsigned) type, share->key);
3338 return 0;
3341 ndb_handle_schema_change(thd, ndb, pOp, share);
3342 return 0;
3346 Handle data events from the storage nodes
3348 static int
3349 ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
3350 ndb_binlog_index_row &row,
3351 injector::transaction &trans)
3353 NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
3354 if (share == ndb_apply_status_share)
3355 return 0;
3357 uint32 originating_server_id= pOp->getAnyValue();
3358 if (originating_server_id == 0)
3359 originating_server_id= ::server_id;
3360 else if (originating_server_id & NDB_ANYVALUE_RESERVED)
3362 if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
3363 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
3364 "event not logged",
3365 originating_server_id);
3366 return 0;
3368 else if (!g_ndb_log_slave_updates)
3371 This event comes from a slave applier since it has an originating
3372 server id set. Since option to log slave updates is not set, skip it.
3374 return 0;
3377 TABLE *table= share->table;
3378 DBUG_ASSERT(trans.good());
3379 DBUG_ASSERT(table != 0);
3381 dbug_print_table("table", table);
3383 TABLE_SHARE *table_s= table->s;
3384 uint n_fields= table_s->fields;
3385 MY_BITMAP b;
3386 /* Potential buffer for the bitmap */
3387 uint32 bitbuf[128 / (sizeof(uint32) * 8)];
3388 bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
3389 n_fields, FALSE);
3390 bitmap_set_all(&b);
3393 row data is already in table->record[0]
3394 As we told the NdbEventOperation to do this
3395 (saves moving data about many times)
3399 for now malloc/free blobs buffer each time
3400 TODO if possible share single permanent buffer with handlers
3402 uchar* blobs_buffer[2] = { 0, 0 };
3403 uint blobs_buffer_size[2] = { 0, 0 };
3405 switch(pOp->getEventType())
3407 case NDBEVENT::TE_INSERT:
3408 row.n_inserts++;
3409 DBUG_PRINT("info", ("INSERT INTO %s.%s",
3410 table_s->db.str, table_s->table_name.str));
3412 if (share->flags & NSF_BLOB_FLAG)
3414 my_ptrdiff_t ptrdiff= 0;
3415 IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[0],
3416 blobs_buffer[0],
3417 blobs_buffer_size[0],
3418 ptrdiff);
3419 DBUG_ASSERT(ret == 0);
3421 ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
3422 IF_DBUG(int ret=) trans.write_row(originating_server_id,
3423 injector::transaction::table(table,
3424 TRUE),
3425 &b, n_fields, table->record[0]);
3426 DBUG_ASSERT(ret == 0);
3428 break;
3429 case NDBEVENT::TE_DELETE:
3430 row.n_deletes++;
3431 DBUG_PRINT("info",("DELETE FROM %s.%s",
3432 table_s->db.str, table_s->table_name.str));
3435 table->record[0] contains only the primary key in this case
3436 since we do not have an after image
3438 int n;
3439 if (table->s->primary_key != MAX_KEY)
3440 n= 0; /*
3441 use the primary key only as it save time and space and
3442 it is the only thing needed to log the delete
3444 else
3445 n= 1; /*
3446 we use the before values since we don't have a primary key
3447 since the mysql server does not handle the hidden primary
3451 if (share->flags & NSF_BLOB_FLAG)
3453 my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
3454 IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[n],
3455 blobs_buffer[n],
3456 blobs_buffer_size[n],
3457 ptrdiff);
3458 DBUG_ASSERT(ret == 0);
3460 ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
3461 DBUG_EXECUTE("info", print_records(table, table->record[n]););
3462 IF_DBUG(int ret =) trans.delete_row(originating_server_id,
3463 injector::transaction::table(table,
3464 TRUE),
3465 &b, n_fields, table->record[n]);
3466 DBUG_ASSERT(ret == 0);
3468 break;
3469 case NDBEVENT::TE_UPDATE:
3470 row.n_updates++;
3471 DBUG_PRINT("info", ("UPDATE %s.%s",
3472 table_s->db.str, table_s->table_name.str));
3474 if (share->flags & NSF_BLOB_FLAG)
3476 my_ptrdiff_t ptrdiff= 0;
3477 IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[0],
3478 blobs_buffer[0],
3479 blobs_buffer_size[0],
3480 ptrdiff);
3481 DBUG_ASSERT(ret == 0);
3483 ndb_unpack_record(table, share->ndb_value[0],
3484 &b, table->record[0]);
3485 DBUG_EXECUTE("info", print_records(table, table->record[0]););
3486 if (table->s->primary_key != MAX_KEY)
3489 since table has a primary key, we can do a write
3490 using only after values
3492 trans.write_row(originating_server_id,
3493 injector::transaction::table(table, TRUE),
3494 &b, n_fields, table->record[0]);// after values
3496 else
3499 mysql server cannot handle the ndb hidden key and
3500 therefore needs the before image as well
3502 if (share->flags & NSF_BLOB_FLAG)
3504 my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
3505 IF_DBUG(int ret =) get_ndb_blobs_value(table, share->ndb_value[1],
3506 blobs_buffer[1],
3507 blobs_buffer_size[1],
3508 ptrdiff);
3509 DBUG_ASSERT(ret == 0);
3511 ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
3512 DBUG_EXECUTE("info", print_records(table, table->record[1]););
3513 IF_DBUG(int ret =) trans.update_row(originating_server_id,
3514 injector::transaction::table(table,
3515 TRUE),
3516 &b, n_fields,
3517 table->record[1], // before values
3518 table->record[0]);// after values
3519 DBUG_ASSERT(ret == 0);
3522 break;
3523 default:
3524 /* We should REALLY never get here. */
3525 DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
3526 break;
3529 if (share->flags & NSF_BLOB_FLAG)
3531 my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
3532 my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
3535 return 0;
3538 //#define RUN_NDB_BINLOG_TIMER
3539 #ifdef RUN_NDB_BINLOG_TIMER
3540 class Timer
3542 public:
3543 Timer() { start(); }
3544 void start() { gettimeofday(&m_start, 0); }
3545 void stop() { gettimeofday(&m_stop, 0); }
3546 ulong elapsed_ms()
3548 return (ulong)
3549 (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
3550 ((longlong) m_stop.tv_usec -
3551 (longlong) m_start.tv_usec + 999) / 1000);
3553 private:
3554 struct timeval m_start,m_stop;
3556 #endif
3558 /****************************************************************
3559 Injector thread main loop
3560 ****************************************************************/
3562 static uchar *
3563 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
3564 size_t *length,
3565 my_bool not_used __attribute__((unused)))
3567 *length= schema_object->key_length;
3568 return (uchar*) schema_object->key;
3571 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
3572 my_bool create_if_not_exists,
3573 my_bool have_lock)
3575 NDB_SCHEMA_OBJECT *ndb_schema_object;
3576 uint length= (uint) strlen(key);
3577 DBUG_ENTER("ndb_get_schema_object");
3578 DBUG_PRINT("enter", ("key: '%s'", key));
3580 if (!have_lock)
3581 pthread_mutex_lock(&ndbcluster_mutex);
3582 while (!(ndb_schema_object=
3583 (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
3584 (uchar*) key,
3585 length)))
3587 if (!create_if_not_exists)
3589 DBUG_PRINT("info", ("does not exist"));
3590 break;
3592 if (!(ndb_schema_object=
3593 (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
3594 MYF(MY_WME | MY_ZEROFILL))))
3596 DBUG_PRINT("info", ("malloc error"));
3597 break;
3599 ndb_schema_object->key= (char *)(ndb_schema_object+1);
3600 memcpy(ndb_schema_object->key, key, length + 1);
3601 ndb_schema_object->key_length= length;
3602 if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
3604 my_free((uchar*) ndb_schema_object, 0);
3605 break;
3607 pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
3608 bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
3609 sizeof(ndb_schema_object->slock)*8, FALSE);
3610 bitmap_clear_all(&ndb_schema_object->slock_bitmap);
3611 break;
3613 if (ndb_schema_object)
3615 ndb_schema_object->use_count++;
3616 DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
3618 if (!have_lock)
3619 pthread_mutex_unlock(&ndbcluster_mutex);
3620 DBUG_RETURN(ndb_schema_object);
3624 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
3625 bool have_lock)
3627 DBUG_ENTER("ndb_free_schema_object");
3628 DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
3629 if (!have_lock)
3630 pthread_mutex_lock(&ndbcluster_mutex);
3631 if (!--(*ndb_schema_object)->use_count)
3633 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3634 hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
3635 pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
3636 my_free((uchar*) *ndb_schema_object, MYF(0));
3637 *ndb_schema_object= 0;
3639 else
3641 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3643 if (!have_lock)
3644 pthread_mutex_unlock(&ndbcluster_mutex);
3645 DBUG_VOID_RETURN;
3649 pthread_handler_t ndb_binlog_thread_func(void *arg)
3651 THD *thd; /* needs to be first for thread_stack */
3652 Ndb *i_ndb= 0;
3653 Ndb *s_ndb= 0;
3654 Thd_ndb *thd_ndb=0;
3655 int ndb_update_ndb_binlog_index= 1;
3656 injector *inj= injector::instance();
3657 uint incident_id= 0;
3659 #ifdef RUN_NDB_BINLOG_TIMER
3660 Timer main_timer;
3661 #endif
3663 pthread_mutex_lock(&injector_mutex);
3665 Set up the Thread
3667 my_thread_init();
3668 DBUG_ENTER("ndb_binlog_thread");
3670 thd= new THD; /* note that contructor of THD uses DBUG_ */
3671 THD_CHECK_SENTRY(thd);
3673 /* We need to set thd->thread_id before thd->store_globals, or it will
3674 set an invalid value for thd->variables.pseudo_thread_id.
3676 pthread_mutex_lock(&LOCK_thread_count);
3677 thd->thread_id= thread_id++;
3678 pthread_mutex_unlock(&LOCK_thread_count);
3680 thd->thread_stack= (char*) &thd; /* remember where our stack is */
3681 if (thd->store_globals())
3683 thd->cleanup();
3684 delete thd;
3685 ndb_binlog_thread_running= -1;
3686 pthread_mutex_unlock(&injector_mutex);
3687 pthread_cond_signal(&injector_cond);
3689 DBUG_LEAVE; // Must match DBUG_ENTER()
3690 my_thread_end();
3691 pthread_exit(0);
3692 return NULL; // Avoid compiler warnings
3694 lex_start(thd);
3696 thd->init_for_queries();
3697 thd->command= COM_DAEMON;
3698 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
3699 thd->version= refresh_version;
3700 thd->main_security_ctx.host_or_ip= "";
3701 thd->client_capabilities= 0;
3702 my_net_init(&thd->net, 0);
3703 thd->main_security_ctx.master_access= ~0;
3704 thd->main_security_ctx.priv_user= 0;
3707 Set up ndb binlog
3709 sql_print_information("Starting MySQL Cluster Binlog Thread");
3711 pthread_detach_this_thread();
3712 thd->real_id= pthread_self();
3713 pthread_mutex_lock(&LOCK_thread_count);
3714 threads.append(thd);
3715 pthread_mutex_unlock(&LOCK_thread_count);
3716 thd->lex->start_transaction_opt= 0;
3718 if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3719 s_ndb->init())
3721 sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
3722 ndb_binlog_thread_running= -1;
3723 pthread_mutex_unlock(&injector_mutex);
3724 pthread_cond_signal(&injector_cond);
3725 goto err;
3728 // empty database
3729 if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3730 i_ndb->init())
3732 sql_print_error("NDB Binlog: Getting Ndb object failed");
3733 ndb_binlog_thread_running= -1;
3734 pthread_mutex_unlock(&injector_mutex);
3735 pthread_cond_signal(&injector_cond);
3736 goto err;
3739 /* init hash for schema object distribution */
3740 (void) hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
3741 (hash_get_key)ndb_schema_objects_get_key, 0, 0);
3744 Expose global reference to our ndb object.
3746 Used by both sql client thread and binlog thread to interact
3747 with the storage
3748 pthread_mutex_lock(&injector_mutex);
3750 injector_thd= thd;
3751 injector_ndb= i_ndb;
3752 p_latest_trans_gci=
3753 injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
3754 schema_ndb= s_ndb;
3756 if (opt_bin_log)
3758 ndb_binlog_running= TRUE;
3761 /* Thread start up completed */
3762 ndb_binlog_thread_running= 1;
3763 pthread_mutex_unlock(&injector_mutex);
3764 pthread_cond_signal(&injector_cond);
3767 wait for mysql server to start (so that the binlog is started
3768 and thus can receive the first GAP event)
3770 pthread_mutex_lock(&LOCK_server_started);
3771 while (!mysqld_server_started)
3773 struct timespec abstime;
3774 set_timespec(abstime, 1);
3775 pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
3776 &abstime);
3777 if (ndbcluster_terminating)
3779 pthread_mutex_unlock(&LOCK_server_started);
3780 goto err;
3783 pthread_mutex_unlock(&LOCK_server_started);
3784 restart:
3786 Main NDB Injector loop
3788 while (ndb_binlog_running)
3791 check if it is the first log, if so we do not insert a GAP event
3792 as there is really no log to have a GAP in
3794 if (incident_id == 0)
3796 LOG_INFO log_info;
3797 mysql_bin_log.get_current_log(&log_info);
3798 int len= strlen(log_info.log_file_name);
3799 uint no= 0;
3800 if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
3801 no == 1)
3803 /* this is the fist log, so skip GAP event */
3804 break;
3809 Always insert a GAP event as we cannot know what has happened
3810 in the cluster while not being connected.
3812 LEX_STRING const msg[2]=
3814 { C_STRING_WITH_LEN("mysqld startup") },
3815 { C_STRING_WITH_LEN("cluster disconnect")}
3817 IF_DBUG(int error=)
3818 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
3819 DBUG_ASSERT(!error);
3820 break;
3822 incident_id= 1;
3824 thd->proc_info= "Waiting for ndbcluster to start";
3826 pthread_mutex_lock(&injector_mutex);
3827 while (!ndb_schema_share ||
3828 (ndb_binlog_running && !ndb_apply_status_share))
3830 /* ndb not connected yet */
3831 struct timespec abstime;
3832 set_timespec(abstime, 1);
3833 pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
3834 if (ndbcluster_binlog_terminating)
3836 pthread_mutex_unlock(&injector_mutex);
3837 goto err;
3840 pthread_mutex_unlock(&injector_mutex);
3842 if (thd_ndb == NULL)
3844 DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
3845 if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
3847 sql_print_error("Could not allocate Thd_ndb object");
3848 goto err;
3850 set_thd_ndb(thd, thd_ndb);
3851 thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
3852 thd->query_id= 0; // to keep valgrind quiet
3857 // wait for the first event
3858 thd->proc_info= "Waiting for first event from ndbcluster";
3859 int schema_res, res;
3860 Uint64 schema_gci;
3863 DBUG_PRINT("info", ("Waiting for the first event"));
3865 if (ndbcluster_binlog_terminating)
3866 goto err;
3868 schema_res= s_ndb->pollEvents(100, &schema_gci);
3869 } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
3870 if (ndb_binlog_running)
3872 Uint64 gci= i_ndb->getLatestGCI();
3873 while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
3875 if (ndbcluster_binlog_terminating)
3876 goto err;
3877 res= i_ndb->pollEvents(10, &gci);
3879 if (gci > schema_gci)
3881 schema_gci= gci;
3884 // now check that we have epochs consistant with what we had before the restart
3885 DBUG_PRINT("info", ("schema_res: %d schema_gci: %lu", schema_res,
3886 (long) schema_gci));
3888 i_ndb->flushIncompleteEvents(schema_gci);
3889 s_ndb->flushIncompleteEvents(schema_gci);
3890 if (schema_gci < ndb_latest_handled_binlog_epoch)
3892 sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
3893 "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
3894 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
3895 (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
3896 *p_latest_trans_gci= 0;
3897 ndb_latest_handled_binlog_epoch= 0;
3898 ndb_latest_applied_binlog_epoch= 0;
3899 ndb_latest_received_binlog_epoch= 0;
3901 else if (ndb_latest_applied_binlog_epoch > 0)
3903 sql_print_warning("NDB Binlog: cluster has reconnected. "
3904 "Changes to the database that occured while "
3905 "disconnected will not be in the binlog");
3907 if (ndb_extra_logging)
3909 sql_print_information("NDB Binlog: starting log at epoch %u",
3910 (unsigned)schema_gci);
3915 static char db[]= "";
3916 thd->db= db;
3917 if (ndb_binlog_running)
3918 open_ndb_binlog_index(thd, &binlog_tables, &ndb_binlog_index);
3919 thd->db= db;
3921 do_ndbcluster_binlog_close_connection= BCCC_running;
3922 for ( ; !((ndbcluster_binlog_terminating ||
3923 do_ndbcluster_binlog_close_connection) &&
3924 ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
3925 do_ndbcluster_binlog_close_connection != BCCC_restart; )
3927 #ifndef DBUG_OFF
3928 if (do_ndbcluster_binlog_close_connection)
3930 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
3931 "ndb_latest_handled_binlog_epoch: %lu, "
3932 "*p_latest_trans_gci: %lu",
3933 do_ndbcluster_binlog_close_connection,
3934 (ulong) ndb_latest_handled_binlog_epoch,
3935 (ulong) *p_latest_trans_gci));
3937 #endif
3938 #ifdef RUN_NDB_BINLOG_TIMER
3939 main_timer.stop();
3940 sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
3941 main_timer.start();
3942 #endif
3945 now we don't want any events before next gci is complete
3947 thd->proc_info= "Waiting for event from ndbcluster";
3948 thd->set_time();
3950 /* wait for event or 1000 ms */
3951 Uint64 gci= 0, schema_gci;
3952 int res= 0, tot_poll_wait= 1000;
3953 if (ndb_binlog_running)
3955 res= i_ndb->pollEvents(tot_poll_wait, &gci);
3956 tot_poll_wait= 0;
3958 else
3961 Just consume any events, not used if no binlogging
3962 e.g. node failure events
3964 Uint64 tmp_gci;
3965 if (i_ndb->pollEvents(0, &tmp_gci))
3966 while (i_ndb->nextEvent())
3969 int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
3970 ndb_latest_received_binlog_epoch= gci;
3972 while (gci > schema_gci && schema_res >= 0)
3974 static char buf[64];
3975 thd->proc_info= "Waiting for schema epoch";
3976 my_snprintf(buf, sizeof(buf), "%s %u(%u)", thd->proc_info, (unsigned) schema_gci, (unsigned) gci);
3977 thd->proc_info= buf;
3978 schema_res= s_ndb->pollEvents(10, &schema_gci);
3981 if ((ndbcluster_binlog_terminating ||
3982 do_ndbcluster_binlog_close_connection) &&
3983 (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
3984 !ndb_binlog_running))
3985 break; /* Shutting down server */
3987 if (ndb_binlog_index && ndb_binlog_index->s->version < refresh_version)
3989 if (ndb_binlog_index->s->version < refresh_version)
3991 close_thread_tables(thd);
3992 ndb_binlog_index= 0;
3996 MEM_ROOT **root_ptr=
3997 my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
3998 MEM_ROOT *old_root= *root_ptr;
3999 MEM_ROOT mem_root;
4000 init_sql_alloc(&mem_root, 4096, 0);
4001 List<Cluster_schema> post_epoch_log_list;
4002 List<Cluster_schema> post_epoch_unlock_list;
4003 *root_ptr= &mem_root;
4005 if (unlikely(schema_res > 0))
4007 thd->proc_info= "Processing events from schema table";
4008 s_ndb->
4009 setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
4010 s_ndb->
4011 setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
4012 NdbEventOperation *pOp= s_ndb->nextEvent();
4013 while (pOp != NULL)
4015 if (!pOp->hasError())
4017 ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
4018 &post_epoch_log_list,
4019 &post_epoch_unlock_list,
4020 &mem_root);
4021 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4022 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4023 "<empty>"));
4024 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4025 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4026 "<empty>"));
4027 if (i_ndb->getEventOperation() == NULL &&
4028 s_ndb->getEventOperation() == NULL &&
4029 do_ndbcluster_binlog_close_connection == BCCC_running)
4031 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4032 do_ndbcluster_binlog_close_connection= BCCC_restart;
4033 if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4035 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4036 "as latest received epoch is %lu",
4037 (ulong) *p_latest_trans_gci,
4038 (ulong) ndb_latest_received_binlog_epoch);
4042 else
4043 sql_print_error("NDB: error %lu (%s) on handling "
4044 "binlog schema event",
4045 (ulong) pOp->getNdbError().code,
4046 pOp->getNdbError().message);
4047 pOp= s_ndb->nextEvent();
4051 if (res > 0)
4053 DBUG_PRINT("info", ("pollEvents res: %d", res));
4054 thd->proc_info= "Processing events";
4055 NdbEventOperation *pOp= i_ndb->nextEvent();
4056 ndb_binlog_index_row row;
4057 while (pOp != NULL)
4059 #ifdef RUN_NDB_BINLOG_TIMER
4060 Timer gci_timer, write_timer;
4061 int event_count= 0;
4062 gci_timer.start();
4063 #endif
4064 gci= pOp->getGCI();
4065 DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
4066 // sometimes get TE_ALTER with invalid table
4067 DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
4068 ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
4069 DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
4071 /* initialize some variables for this epoch */
4072 g_ndb_log_slave_updates= opt_log_slave_updates;
4073 i_ndb->
4074 setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
4075 i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
4077 bzero((char*) &row, sizeof(row));
4078 thd->variables.character_set_client= &my_charset_latin1;
4079 injector::transaction trans;
4080 // pass table map before epoch
4082 Uint32 iter= 0;
4083 const NdbEventOperation *gci_op;
4084 Uint32 event_types;
4085 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4086 != NULL)
4088 NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
4089 DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
4090 (long) gci_op, (long) share, event_types));
4091 // workaround for interface returning TE_STOP events
4092 // which are normally filtered out below in the nextEvent loop
4093 if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
4095 DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
4096 gci_op->getEvent()->getTable()->getName()));
4097 continue;
4099 // this should not happen
4100 if (share == NULL || share->table == NULL)
4102 DBUG_PRINT("info", ("no share or table %s!",
4103 gci_op->getEvent()->getTable()->getName()));
4104 continue;
4106 if (share == ndb_apply_status_share)
4108 // skip this table, it is handled specially
4109 continue;
4111 TABLE *table= share->table;
4112 #ifndef DBUG_OFF
4113 const LEX_STRING &name= table->s->table_name;
4114 #endif
4115 if ((event_types & (NdbDictionary::Event::TE_INSERT |
4116 NdbDictionary::Event::TE_UPDATE |
4117 NdbDictionary::Event::TE_DELETE)) == 0)
4119 DBUG_PRINT("info", ("skipping non data event table: %.*s",
4120 (int) name.length, name.str));
4121 continue;
4123 if (!trans.good())
4125 DBUG_PRINT("info",
4126 ("Found new data event, initializing transaction"));
4127 inj->new_trans(thd, &trans);
4129 DBUG_PRINT("info", ("use_table: %.*s",
4130 (int) name.length, name.str));
4131 injector::transaction::table tbl(table, TRUE);
4132 IF_DBUG(int ret=) trans.use_table(::server_id, tbl);
4133 DBUG_ASSERT(ret == 0);
4136 if (trans.good())
4138 if (ndb_apply_status_share)
4140 TABLE *table= ndb_apply_status_share->table;
4142 #ifndef DBUG_OFF
4143 const LEX_STRING& name= table->s->table_name;
4144 DBUG_PRINT("info", ("use_table: %.*s",
4145 (int) name.length, name.str));
4146 #endif
4147 injector::transaction::table tbl(table, TRUE);
4148 IF_DBUG(int ret=) trans.use_table(::server_id, tbl);
4149 DBUG_ASSERT(ret == 0);
4152 Intialize table->record[0]
4154 empty_record(table);
4156 table->field[0]->store((longlong)::server_id);
4157 table->field[1]->store((longlong)gci);
4158 table->field[2]->store("", 0, &my_charset_bin);
4159 table->field[3]->store((longlong)0);
4160 table->field[4]->store((longlong)0);
4161 trans.write_row(::server_id,
4162 injector::transaction::table(table, TRUE),
4163 &table->s->all_set, table->s->fields,
4164 table->record[0]);
4166 else
4168 sql_print_error("NDB: Could not get apply status share");
4171 #ifdef RUN_NDB_BINLOG_TIMER
4172 write_timer.start();
4173 #endif
4176 #ifdef RUN_NDB_BINLOG_TIMER
4177 event_count++;
4178 #endif
4179 if (pOp->hasError() &&
4180 ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
4181 goto err;
4183 #ifndef DBUG_OFF
4185 NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
4186 DBUG_PRINT("info",
4187 ("EVENT TYPE: %d GCI: %ld last applied: %ld "
4188 "share: 0x%lx (%s.%s)", pOp->getEventType(),
4189 (long) gci,
4190 (long) ndb_latest_applied_binlog_epoch,
4191 (long) share,
4192 share ? share->db : "'NULL'",
4193 share ? share->table_name : "'NULL'"));
4194 DBUG_ASSERT(share != 0);
4196 // assert that there is consistancy between gci op list
4197 // and event list
4199 Uint32 iter= 0;
4200 const NdbEventOperation *gci_op;
4201 Uint32 event_types;
4202 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4203 != NULL)
4205 if (gci_op == pOp)
4206 break;
4208 DBUG_ASSERT(gci_op == pOp);
4209 DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
4211 #endif
4212 if ((unsigned) pOp->getEventType() <
4213 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
4214 ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
4215 else
4217 // set injector_ndb database/schema from table internal name
4218 IF_DBUG(int ret=)
4219 i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
4220 DBUG_ASSERT(ret == 0);
4221 ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
4222 // reset to catch errors
4223 i_ndb->setDatabaseName("");
4224 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4225 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4226 "<empty>"));
4227 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4228 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4229 "<empty>"));
4230 if (i_ndb->getEventOperation() == NULL &&
4231 s_ndb->getEventOperation() == NULL &&
4232 do_ndbcluster_binlog_close_connection == BCCC_running)
4234 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4235 do_ndbcluster_binlog_close_connection= BCCC_restart;
4236 if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4238 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4239 "as latest received epoch is %lu",
4240 (ulong) *p_latest_trans_gci,
4241 (ulong) ndb_latest_received_binlog_epoch);
4246 pOp= i_ndb->nextEvent();
4247 } while (pOp && pOp->getGCI() == gci);
4250 note! pOp is not referring to an event in the next epoch
4251 or is == 0
4253 #ifdef RUN_NDB_BINLOG_TIMER
4254 write_timer.stop();
4255 #endif
4257 if (trans.good())
4259 //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
4260 thd->proc_info= "Committing events to binlog";
4261 injector::transaction::binlog_pos start= trans.start_pos();
4262 if (int r= trans.commit())
4264 sql_print_error("NDB Binlog: "
4265 "Error during COMMIT of GCI. Error: %d",
4267 /* TODO: Further handling? */
4269 row.gci= gci;
4270 row.master_log_file= start.file_name();
4271 row.master_log_pos= start.file_pos();
4273 DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
4274 if (ndb_update_ndb_binlog_index)
4275 ndb_add_ndb_binlog_index(thd, &row);
4276 ndb_latest_applied_binlog_epoch= gci;
4278 ndb_latest_handled_binlog_epoch= gci;
4279 #ifdef RUN_NDB_BINLOG_TIMER
4280 gci_timer.stop();
4281 sql_print_information("gci %ld event_count %d write time "
4282 "%ld(%d e/s), total time %ld(%d e/s)",
4283 (ulong)gci, event_count,
4284 write_timer.elapsed_ms(),
4285 (1000*event_count) / write_timer.elapsed_ms(),
4286 gci_timer.elapsed_ms(),
4287 (1000*event_count) / gci_timer.elapsed_ms());
4288 #endif
4292 ndb_binlog_thread_handle_schema_event_post_epoch(thd,
4293 &post_epoch_log_list,
4294 &post_epoch_unlock_list);
4295 free_root(&mem_root, MYF(0));
4296 *root_ptr= old_root;
4297 ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
4299 if (do_ndbcluster_binlog_close_connection == BCCC_restart)
4301 ndb_binlog_tables_inited= FALSE;
4302 close_thread_tables(thd);
4303 ndb_binlog_index= 0;
4304 goto restart;
4306 err:
4307 sql_print_information("Stopping Cluster Binlog");
4308 DBUG_PRINT("info",("Shutting down cluster binlog thread"));
4309 thd->proc_info= "Shutting down";
4310 close_thread_tables(thd);
4311 pthread_mutex_lock(&injector_mutex);
4312 /* don't mess with the injector_ndb anymore from other threads */
4313 injector_thd= 0;
4314 injector_ndb= 0;
4315 p_latest_trans_gci= 0;
4316 schema_ndb= 0;
4317 pthread_mutex_unlock(&injector_mutex);
4318 thd->db= 0; // as not to try to free memory
4320 if (ndb_apply_status_share)
4322 /* ndb_share reference binlog extra free */
4323 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4324 ndb_apply_status_share->key,
4325 ndb_apply_status_share->use_count));
4326 free_share(&ndb_apply_status_share);
4327 ndb_apply_status_share= 0;
4329 if (ndb_schema_share)
4331 /* begin protect ndb_schema_share */
4332 pthread_mutex_lock(&ndb_schema_share_mutex);
4333 /* ndb_share reference binlog extra free */
4334 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4335 ndb_schema_share->key,
4336 ndb_schema_share->use_count));
4337 free_share(&ndb_schema_share);
4338 ndb_schema_share= 0;
4339 ndb_binlog_tables_inited= 0;
4340 pthread_mutex_unlock(&ndb_schema_share_mutex);
4341 /* end protect ndb_schema_share */
4344 /* remove all event operations */
4345 if (s_ndb)
4347 NdbEventOperation *op;
4348 DBUG_PRINT("info",("removing all event operations"));
4349 while ((op= s_ndb->getEventOperation()))
4351 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4352 DBUG_PRINT("info",("removing event operation on %s",
4353 op->getEvent()->getName()));
4354 NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4355 DBUG_ASSERT(share != 0);
4356 DBUG_ASSERT(share->op == op ||
4357 share->op_old == op);
4358 share->op= share->op_old= 0;
4359 /* ndb_share reference binlog free */
4360 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4361 share->key, share->use_count));
4362 free_share(&share);
4363 s_ndb->dropEventOperation(op);
4365 delete s_ndb;
4366 s_ndb= 0;
4368 if (i_ndb)
4370 NdbEventOperation *op;
4371 DBUG_PRINT("info",("removing all event operations"));
4372 while ((op= i_ndb->getEventOperation()))
4374 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4375 DBUG_PRINT("info",("removing event operation on %s",
4376 op->getEvent()->getName()));
4377 NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4378 DBUG_ASSERT(share != 0);
4379 DBUG_ASSERT(share->op == op ||
4380 share->op_old == op);
4381 share->op= share->op_old= 0;
4382 /* ndb_share reference binlog free */
4383 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4384 share->key, share->use_count));
4385 free_share(&share);
4386 i_ndb->dropEventOperation(op);
4388 delete i_ndb;
4389 i_ndb= 0;
4392 hash_free(&ndb_schema_objects);
4394 net_end(&thd->net);
4395 thd->cleanup();
4396 delete thd;
4398 ndb_binlog_thread_running= -1;
4399 ndb_binlog_running= FALSE;
4400 (void) pthread_cond_signal(&injector_cond);
4402 DBUG_PRINT("exit", ("ndb_binlog_thread"));
4404 DBUG_LEAVE; // Must match DBUG_ENTER()
4405 my_thread_end();
4406 pthread_exit(0);
4407 return NULL; // Avoid compiler warnings
4410 bool
4411 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
4412 enum ha_stat_type stat_type)
4414 char buf[IO_SIZE];
4415 uint buflen;
4416 ulonglong ndb_latest_epoch= 0;
4417 DBUG_ENTER("ndbcluster_show_status_binlog");
4419 pthread_mutex_lock(&injector_mutex);
4420 if (injector_ndb)
4422 char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
4423 ndb_latest_epoch= injector_ndb->getLatestGCI();
4424 pthread_mutex_unlock(&injector_mutex);
4426 buflen=
4427 snprintf(buf, sizeof(buf),
4428 "latest_epoch=%s, "
4429 "latest_trans_epoch=%s, "
4430 "latest_received_binlog_epoch=%s, "
4431 "latest_handled_binlog_epoch=%s, "
4432 "latest_applied_binlog_epoch=%s",
4433 llstr(ndb_latest_epoch, buff1),
4434 llstr(*p_latest_trans_gci, buff2),
4435 llstr(ndb_latest_received_binlog_epoch, buff3),
4436 llstr(ndb_latest_handled_binlog_epoch, buff4),
4437 llstr(ndb_latest_applied_binlog_epoch, buff5));
4438 if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
4439 "binlog", strlen("binlog"),
4440 buf, buflen))
4441 DBUG_RETURN(TRUE);
4443 else
4444 pthread_mutex_unlock(&injector_mutex);
4445 DBUG_RETURN(FALSE);
4448 #endif /* HAVE_NDB_BINLOG */
4449 #endif