1 /* Copyright (c) 2006, 2012, Oracle and/or its affiliates. All rights reserved.
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 #include "mysql_priv.h"
18 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
19 #include "ha_ndbcluster.h"
21 #ifdef HAVE_NDB_BINLOG
22 #include "rpl_injector.h"
23 #include "rpl_filter.h"
25 #include "log_event.h"
26 #include "ha_ndbcluster_binlog.h"
27 #include "NdbDictionary.hpp"
28 #include "ndb_cluster_connection.hpp"
29 #include <util/NdbAutoPtr.hpp>
33 #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
37 defines for cluster replication table names
39 #include "ha_ndbcluster_tables.h"
40 #define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
41 #define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
44 Timeout for syncing schema events between
45 mysql servers, and between mysql server and the binlog
47 const int opt_ndb_sync_timeout
= 120;
50 Flag showing if the ndb injector thread is running, if so == 1
51 -1 if it was started but later stopped for some reason
54 int ndb_binlog_thread_running
= 0;
56 Flag showing if the ndb binlog should be created, if so == TRUE
59 my_bool ndb_binlog_running
= FALSE
;
60 my_bool ndb_binlog_tables_inited
= FALSE
;
63 Global reference to the ndb injector thread THD oject
65 Has one sole purpose, for setting the in_use table member variable
71 Global reference to ndb injector thd object.
73 Used mainly by the binlog index thread, but exposed to the client sql
74 thread for one reason; to setup the events operations for a table
75 to enable ndb injector thread receiving events.
77 Must therefore always be used with a surrounding
78 pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
80 static Ndb
*injector_ndb
= 0;
81 static Ndb
*schema_ndb
= 0;
83 static int ndbcluster_binlog_inited
= 0;
85 Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
86 Server main loop should call handlerton function:
88 ndbcluster_hton->binlog_func ==
89 ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
92 at shutdown, which sets the flag. And then server needs to wait for it
93 to complete. Otherwise binlog will not be complete.
95 ndbcluster_hton->panic == ndbcluster_end() will not return until
96 ndb binlog is completed
98 static int ndbcluster_binlog_terminating
= 0;
101 Mutex and condition used for interacting between client sql thread
104 pthread_t ndb_binlog_thread
;
105 pthread_mutex_t injector_mutex
;
106 pthread_cond_t injector_cond
;
108 /* NDB Injector thread (used for binlog creation) */
109 static ulonglong ndb_latest_applied_binlog_epoch
= 0;
110 static ulonglong ndb_latest_handled_binlog_epoch
= 0;
111 static ulonglong ndb_latest_received_binlog_epoch
= 0;
113 NDB_SHARE
*ndb_apply_status_share
= 0;
114 NDB_SHARE
*ndb_schema_share
= 0;
115 pthread_mutex_t ndb_schema_share_mutex
;
117 extern my_bool opt_log_slave_updates
;
118 static my_bool g_ndb_log_slave_updates
;
120 /* Schema object distribution handling */
121 HASH ndb_schema_objects
;
122 typedef struct st_ndb_schema_object
{
123 pthread_mutex_t mutex
;
127 MY_BITMAP slock_bitmap
;
128 uint32 slock
[256/32]; // 256 bits for lock status of table
130 static NDB_SCHEMA_OBJECT
*ndb_get_schema_object(const char *key
,
131 my_bool create_if_not_exists
,
133 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT
**ndb_schema_object
,
136 static Uint64
*p_latest_trans_gci
= 0;
139 Global variables for holding the ndb_binlog_index table reference
141 static TABLE
*ndb_binlog_index
= 0;
142 static TABLE_LIST binlog_tables
;
149 /* purecov: begin deadcode */
150 static void print_records(TABLE
*table
, const uchar
*record
)
152 for (uint j
= 0; j
< table
->s
->fields
; j
++)
156 Field
*field
= table
->field
[j
];
157 const uchar
* field_ptr
= field
->ptr
- table
->record
[0] + record
;
158 int pack_len
= field
->pack_length();
159 int n
= pack_len
< 10 ? pack_len
: 10;
161 for (int i
= 0; i
< n
&& pos
< 20; i
++)
163 pos
+= sprintf(&buf
[pos
]," %x", (int) (uchar
) field_ptr
[i
]);
166 DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j
, n
, buf
));
171 #define print_records(a,b)
176 static void dbug_print_table(const char *info
, TABLE
*table
)
180 DBUG_PRINT("info",("%s: (null)", info
));
184 ("%s: %s.%s s->fields: %d "
185 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
189 table
->s
->table_name
.str
,
192 table
->s
->rec_buff_length
,
193 (long) table
->record
[0],
194 (long) table
->record
[1]));
196 for (unsigned int i
= 0; i
< table
->s
->fields
; i
++)
198 Field
*f
= table
->field
[i
];
200 ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
201 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
205 (f
->flags
& PRI_KEY_FLAG
) ? "pri" : "attr",
206 (f
->flags
& NOT_NULL_FLAG
) ? "" : ",nullable",
207 (f
->flags
& UNSIGNED_FLAG
) ? ",unsigned" : ",signed",
208 (f
->flags
& ZEROFILL_FLAG
) ? ",zerofill" : "",
209 (f
->flags
& BLOB_FLAG
) ? ",blob" : "",
210 (f
->flags
& BINARY_FLAG
) ? ",binary" : "",
213 (long) f
->ptr
, (int) (f
->ptr
- table
->record
[0]),
216 (int) ((uchar
*) f
->null_ptr
- table
->record
[0])));
217 if (f
->type() == MYSQL_TYPE_BIT
)
219 Field_bit
*g
= (Field_bit
*) f
;
220 DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
221 "bit_ofs: %d bit_len: %u",
222 g
->field_length
, (long) g
->bit_ptr
,
223 (int) ((uchar
*) g
->bit_ptr
-
225 g
->bit_ofs
, g
->bit_len
));
230 #define dbug_print_table(a,b)
235 Run a query through mysql_parse
238 - purging the ndb_binlog_index
239 - creating the ndb_apply_status table
241 static void run_query(THD
*thd
, char *buf
, char *end
,
242 const int *no_print_error
, my_bool disable_binlog
)
244 ulong save_thd_query_length
= thd
->query_length();
245 char *save_thd_query
= thd
->query();
246 ulong save_thread_id
= thd
->variables
.pseudo_thread_id
;
247 struct system_status_var save_thd_status_var
= thd
->status_var
;
248 THD_TRANS save_thd_transaction_all
= thd
->transaction
.all
;
249 THD_TRANS save_thd_transaction_stmt
= thd
->transaction
.stmt
;
250 ulonglong save_thd_options
= thd
->options
;
251 DBUG_ASSERT(sizeof(save_thd_options
) == sizeof(thd
->options
));
252 NET save_thd_net
= thd
->net
;
253 const char* found_semicolon
= NULL
;
255 bzero((char*) &thd
->net
, sizeof(NET
));
256 thd
->set_query(buf
, (uint
) (end
- buf
));
257 thd
->variables
.pseudo_thread_id
= thread_id
;
258 thd
->transaction
.stmt
.modified_non_trans_table
= FALSE
;
260 thd
->options
&= ~OPTION_BIN_LOG
;
262 DBUG_PRINT("query", ("%s", thd
->query()));
264 DBUG_ASSERT(!thd
->in_sub_stmt
);
265 DBUG_ASSERT(!thd
->prelocked_mode
);
267 mysql_parse(thd
, thd
->query(), thd
->query_length(), &found_semicolon
);
269 if (no_print_error
&& thd
->is_slave_error
)
272 Thd_ndb
*thd_ndb
= get_thd_ndb(thd
);
273 for (i
= 0; no_print_error
[i
]; i
++)
274 if ((thd_ndb
->m_error_code
== no_print_error
[i
]) ||
275 (thd
->main_da
.sql_errno() == (unsigned) no_print_error
[i
]))
277 if (!no_print_error
[i
])
278 sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
280 thd
->main_da
.message(),
281 thd
->main_da
.sql_errno(),
282 thd_ndb
->m_error_code
,
283 (int) thd
->is_error(), thd
->is_slave_error
);
285 close_thread_tables(thd
);
287 XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
288 can not be called from within a statement, and
289 run_query() can be called from anywhere, including from within
291 This particular reset is a temporary hack to avoid an assert
292 for double assignment of the diagnostics area when run_query()
293 is called from ndbcluster_reset_logs(), which is called from
296 thd
->main_da
.reset_diagnostics_area();
298 thd
->options
= save_thd_options
;
299 thd
->set_query(save_thd_query
, save_thd_query_length
);
300 thd
->variables
.pseudo_thread_id
= save_thread_id
;
301 thd
->status_var
= save_thd_status_var
;
302 thd
->transaction
.all
= save_thd_transaction_all
;
303 thd
->transaction
.stmt
= save_thd_transaction_stmt
;
304 thd
->net
= save_thd_net
;
306 if (thd
== injector_thd
)
309 running the query will close all tables, including the ndb_binlog_index
317 ndbcluster_binlog_close_table(THD
*thd
, NDB_SHARE
*share
)
319 DBUG_ENTER("ndbcluster_binlog_close_table");
320 if (share
->table_share
)
322 closefrm(share
->table
, 1);
323 share
->table_share
= 0;
326 DBUG_ASSERT(share
->table
== 0);
332 Creates a TABLE object for the ndb cluster table
335 This does not open the underlying table
339 ndbcluster_binlog_open_table(THD
*thd
, NDB_SHARE
*share
,
340 TABLE_SHARE
*table_share
, TABLE
*table
,
344 DBUG_ENTER("ndbcluster_binlog_open_table");
346 safe_mutex_assert_owner(&LOCK_open
);
347 init_tmp_table_share(thd
, table_share
, share
->db
, 0, share
->table_name
,
349 if ((error
= open_table_def(thd
, table_share
, 0)))
351 DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error
, my_errno
));
352 free_table_share(table_share
);
355 if ((error
= open_table_from_share(thd
, table_share
, "", 0 /* fon't allocate buffers */,
356 (uint
) READ_ALL
, 0, table
, FALSE
)))
358 DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error
, my_errno
));
359 free_table_share(table_share
);
362 assign_new_table_id(table_share
);
366 // allocate memory on ndb share so it can be reused after online alter table
367 (void)multi_alloc_root(&share
->mem_root
,
368 &(share
->record
[0]), table
->s
->rec_buff_length
,
369 &(share
->record
[1]), table
->s
->rec_buff_length
,
373 my_ptrdiff_t row_offset
= share
->record
[0] - table
->record
[0];
375 for (p_field
= table
->field
; *p_field
; p_field
++)
376 (*p_field
)->move_field_offset(row_offset
);
377 table
->record
[0]= share
->record
[0];
378 table
->record
[1]= share
->record
[1];
381 table
->in_use
= injector_thd
;
383 table
->s
->db
.str
= share
->db
;
384 table
->s
->db
.length
= strlen(share
->db
);
385 table
->s
->table_name
.str
= share
->table_name
;
386 table
->s
->table_name
.length
= strlen(share
->table_name
);
388 DBUG_ASSERT(share
->table_share
== 0);
389 share
->table_share
= table_share
;
390 DBUG_ASSERT(share
->table
== 0);
392 /* We can't use 'use_all_columns()' as the file object is not setup yet */
393 table
->column_bitmaps_set_no_signal(&table
->s
->all_set
, &table
->s
->all_set
);
395 dbug_print_table("table", table
);
402 Initialize the binlog part of the NDB_SHARE
404 int ndbcluster_binlog_init_share(NDB_SHARE
*share
, TABLE
*_table
)
406 THD
*thd
= current_thd
;
407 MEM_ROOT
*mem_root
= &share
->mem_root
;
408 int do_event_op
= ndb_binlog_running
;
410 DBUG_ENTER("ndbcluster_binlog_init_share");
412 share
->connect_count
= g_ndb_cluster_connection
->get_connect_count();
417 if (!ndb_schema_share
&&
418 strcmp(share
->db
, NDB_REP_DB
) == 0 &&
419 strcmp(share
->table_name
, NDB_SCHEMA_TABLE
) == 0)
421 else if (!ndb_apply_status_share
&&
422 strcmp(share
->db
, NDB_REP_DB
) == 0 &&
423 strcmp(share
->table_name
, NDB_APPLY_TABLE
) == 0)
427 int i
, no_nodes
= g_ndb_cluster_connection
->no_db_nodes();
428 share
->subscriber_bitmap
= (MY_BITMAP
*)
429 alloc_root(mem_root
, no_nodes
* sizeof(MY_BITMAP
));
430 for (i
= 0; i
< no_nodes
; i
++)
432 bitmap_init(&share
->subscriber_bitmap
[i
],
433 (Uint32
*)alloc_root(mem_root
, max_ndb_nodes
/8),
434 max_ndb_nodes
, FALSE
);
435 bitmap_clear_all(&share
->subscriber_bitmap
[i
]);
443 if (_table
->s
->primary_key
== MAX_KEY
)
444 share
->flags
|= NSF_HIDDEN_PK
;
445 if (_table
->s
->blob_fields
!= 0)
446 share
->flags
|= NSF_BLOB_FLAG
;
450 share
->flags
|= NSF_NO_BINLOG
;
457 TABLE_SHARE
*table_share
= (TABLE_SHARE
*) alloc_root(mem_root
, sizeof(*table_share
));
458 TABLE
*table
= (TABLE
*) alloc_root(mem_root
, sizeof(*table
));
459 if ((error
= ndbcluster_binlog_open_table(thd
, share
, table_share
, table
, 0)))
462 ! do not touch the contents of the table
463 it may be in use by the injector thread
465 MEM_ROOT
*mem_root
= &share
->mem_root
;
466 share
->ndb_value
[0]= (NdbValue
*)
467 alloc_root(mem_root
, sizeof(NdbValue
) *
468 (table
->s
->fields
+ 2 /*extra for hidden key and part key*/));
469 share
->ndb_value
[1]= (NdbValue
*)
470 alloc_root(mem_root
, sizeof(NdbValue
) *
471 (table
->s
->fields
+ 2 /*extra for hidden key and part key*/));
473 if (table
->s
->primary_key
== MAX_KEY
)
474 share
->flags
|= NSF_HIDDEN_PK
;
475 if (table
->s
->blob_fields
!= 0)
476 share
->flags
|= NSF_BLOB_FLAG
;
482 /*****************************************************************
483 functions called from master sql client threads
484 ****************************************************************/
487 called in mysql_show_binlog_events and reset_logs to make sure we wait for
488 all events originating from this mysql server to arrive in the binlog
490 Wait for the last epoch in which the last transaction is a part of.
492 Wait a maximum of 30 seconds.
494 static void ndbcluster_binlog_wait(THD
*thd
)
496 if (ndb_binlog_running
)
498 DBUG_ENTER("ndbcluster_binlog_wait");
499 const char *save_info
= thd
? thd
->proc_info
: 0;
500 ulonglong wait_epoch
= *p_latest_trans_gci
;
503 thd
->proc_info
= "Waiting for ndbcluster binlog update to "
504 "reach current position";
505 while (count
&& ndb_binlog_running
&&
506 ndb_latest_handled_binlog_epoch
< wait_epoch
)
512 thd
->proc_info
= save_info
;
518 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
520 static int ndbcluster_reset_logs(THD
*thd
)
522 if (!ndb_binlog_running
)
525 DBUG_ENTER("ndbcluster_reset_logs");
528 Wait for all events orifinating from this mysql server has
529 reached the binlog before continuing to reset
531 ndbcluster_binlog_wait(thd
);
534 char *end
= strmov(buf
, "DELETE FROM " NDB_REP_DB
"." NDB_REP_TABLE
);
536 run_query(thd
, buf
, end
, NULL
, TRUE
);
542 Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
547 ndbcluster_binlog_index_purge_file(THD
*thd
, const char *file
)
549 if (!ndb_binlog_running
|| thd
->slave_thread
)
552 DBUG_ENTER("ndbcluster_binlog_index_purge_file");
553 DBUG_PRINT("enter", ("file: %s", file
));
556 char *end
= strmov(strmov(strmov(buf
,
558 NDB_REP_DB
"." NDB_REP_TABLE
559 " WHERE File='"), file
), "'");
561 run_query(thd
, buf
, end
, NULL
, TRUE
);
567 ndbcluster_binlog_log_query(handlerton
*hton
, THD
*thd
, enum_binlog_command binlog_command
,
568 const char *query
, uint query_length
,
569 const char *db
, const char *table_name
)
571 DBUG_ENTER("ndbcluster_binlog_log_query");
572 DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
573 db
, table_name
, query
));
574 enum SCHEMA_OP_TYPE type
;
576 switch (binlog_command
)
578 case LOGCOM_CREATE_TABLE
:
579 type
= SOT_CREATE_TABLE
;
582 case LOGCOM_ALTER_TABLE
:
583 type
= SOT_ALTER_TABLE
;
586 case LOGCOM_RENAME_TABLE
:
587 type
= SOT_RENAME_TABLE
;
590 case LOGCOM_DROP_TABLE
:
591 type
= SOT_DROP_TABLE
;
594 case LOGCOM_CREATE_DB
:
598 case LOGCOM_ALTER_DB
:
609 ndbcluster_log_schema_op(thd
, 0, query
, query_length
,
610 db
, table_name
, 0, 0, type
,
618 End use of the NDB Cluster binlog
619 - wait for binlog thread to shutdown
622 static int ndbcluster_binlog_end(THD
*thd
)
624 DBUG_ENTER("ndbcluster_binlog_end");
626 if (!ndbcluster_binlog_inited
)
628 ndbcluster_binlog_inited
= 0;
630 #ifdef HAVE_NDB_BINLOG
631 if (ndb_util_thread_running
> 0)
634 Wait for util thread to die (as this uses the injector mutex)
635 There is a very small change that ndb_util_thread dies and the
636 following mutex is freed before it's accessed. This shouldn't
637 however be a likely case as the ndbcluster_binlog_end is supposed to
638 be called before ndb_cluster_end().
640 pthread_mutex_lock(&LOCK_ndb_util_thread
);
641 /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
642 ndb_util_thread_running
++;
643 ndbcluster_terminating
= 1;
644 pthread_cond_signal(&COND_ndb_util_thread
);
645 while (ndb_util_thread_running
> 1)
646 pthread_cond_wait(&COND_ndb_util_ready
, &LOCK_ndb_util_thread
);
647 ndb_util_thread_running
--;
648 pthread_mutex_unlock(&LOCK_ndb_util_thread
);
651 /* wait for injector thread to finish */
652 ndbcluster_binlog_terminating
= 1;
653 pthread_mutex_lock(&injector_mutex
);
654 pthread_cond_signal(&injector_cond
);
655 while (ndb_binlog_thread_running
> 0)
656 pthread_cond_wait(&injector_cond
, &injector_mutex
);
657 pthread_mutex_unlock(&injector_mutex
);
659 pthread_mutex_destroy(&injector_mutex
);
660 pthread_cond_destroy(&injector_cond
);
661 pthread_mutex_destroy(&ndb_schema_share_mutex
);
667 /*****************************************************************
668 functions called from slave sql client threads
669 ****************************************************************/
670 static void ndbcluster_reset_slave(THD
*thd
)
672 if (!ndb_binlog_running
)
675 DBUG_ENTER("ndbcluster_reset_slave");
677 char *end
= strmov(buf
, "DELETE FROM " NDB_REP_DB
"." NDB_APPLY_TABLE
);
678 run_query(thd
, buf
, end
, NULL
, TRUE
);
683 Initialize the binlog part of the ndb handlerton
687 Upon the sql command flush logs, we need to ensure that all outstanding
688 ndb data to be logged has made it to the binary log to get a deterministic
689 behavior on the rotation of the log.
691 static bool ndbcluster_flush_logs(handlerton
*hton
)
693 ndbcluster_binlog_wait(current_thd
);
697 static int ndbcluster_binlog_func(handlerton
*hton
, THD
*thd
,
704 ndbcluster_reset_logs(thd
);
706 case BFN_RESET_SLAVE
:
707 ndbcluster_reset_slave(thd
);
709 case BFN_BINLOG_WAIT
:
710 ndbcluster_binlog_wait(thd
);
713 ndbcluster_binlog_end(thd
);
715 case BFN_BINLOG_PURGE_FILE
:
716 ndbcluster_binlog_index_purge_file(thd
, (const char *)arg
);
722 void ndbcluster_binlog_init_handlerton()
724 handlerton
*h
= ndbcluster_hton
;
725 h
->flush_logs
= ndbcluster_flush_logs
;
726 h
->binlog_func
= ndbcluster_binlog_func
;
727 h
->binlog_log_query
= ndbcluster_binlog_log_query
;
735 check the availability af the ndb_apply_status share
736 - return share, but do not increase refcount
737 - return 0 if there is no share
739 static NDB_SHARE
*ndbcluster_check_ndb_apply_status_share()
741 pthread_mutex_lock(&ndbcluster_mutex
);
743 void *share
= hash_search(&ndbcluster_open_tables
,
744 (uchar
*) NDB_APPLY_TABLE_FILE
,
745 sizeof(NDB_APPLY_TABLE_FILE
) - 1);
746 DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
747 NDB_APPLY_TABLE_FILE
, (long) share
));
748 pthread_mutex_unlock(&ndbcluster_mutex
);
749 return (NDB_SHARE
*) share
;
753 check the availability af the schema share
754 - return share, but do not increase refcount
755 - return 0 if there is no share
757 static NDB_SHARE
*ndbcluster_check_ndb_schema_share()
759 pthread_mutex_lock(&ndbcluster_mutex
);
761 void *share
= hash_search(&ndbcluster_open_tables
,
762 (uchar
*) NDB_SCHEMA_TABLE_FILE
,
763 sizeof(NDB_SCHEMA_TABLE_FILE
) - 1);
764 DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
765 NDB_SCHEMA_TABLE_FILE
, (long) share
));
766 pthread_mutex_unlock(&ndbcluster_mutex
);
767 return (NDB_SHARE
*) share
;
771 Create the ndb_apply_status table
773 static int ndbcluster_create_ndb_apply_status_table(THD
*thd
)
775 DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
778 Check if we already have the apply status table.
779 If so it should have been discovered at startup
780 and thus have a share
783 if (ndbcluster_check_ndb_apply_status_share())
786 if (g_ndb_cluster_connection
->get_no_ready() <= 0)
789 char buf
[1024 + 1], *end
;
791 if (ndb_extra_logging
)
792 sql_print_information("NDB: Creating " NDB_REP_DB
"." NDB_APPLY_TABLE
);
795 Check if apply status table exists in MySQL "dictionary"
796 if so, remove it since there is none in Ndb
799 build_table_filename(buf
, sizeof(buf
) - 1,
800 NDB_REP_DB
, NDB_APPLY_TABLE
, reg_ext
, 0);
801 my_delete(buf
, MYF(0));
805 Note, updating this table schema must be reflected in ndb_restore
807 end
= strmov(buf
, "CREATE TABLE IF NOT EXISTS "
808 NDB_REP_DB
"." NDB_APPLY_TABLE
809 " ( server_id INT UNSIGNED NOT NULL,"
810 " epoch BIGINT UNSIGNED NOT NULL, "
811 " log_name VARCHAR(255) BINARY NOT NULL, "
812 " start_pos BIGINT UNSIGNED NOT NULL, "
813 " end_pos BIGINT UNSIGNED NOT NULL, "
814 " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
816 const int no_print_error
[6]= {ER_TABLE_EXISTS_ERROR
,
819 721, // Table already exist
821 0}; // do not print error 701 etc
822 run_query(thd
, buf
, end
, no_print_error
, TRUE
);
829 Create the schema table
831 static int ndbcluster_create_schema_table(THD
*thd
)
833 DBUG_ENTER("ndbcluster_create_schema_table");
836 Check if we already have the schema table.
837 If so it should have been discovered at startup
838 and thus have a share
841 if (ndbcluster_check_ndb_schema_share())
844 if (g_ndb_cluster_connection
->get_no_ready() <= 0)
847 char buf
[1024 + 1], *end
;
849 if (ndb_extra_logging
)
850 sql_print_information("NDB: Creating " NDB_REP_DB
"." NDB_SCHEMA_TABLE
);
853 Check if schema table exists in MySQL "dictionary"
854 if so, remove it since there is none in Ndb
857 build_table_filename(buf
, sizeof(buf
) - 1,
858 NDB_REP_DB
, NDB_SCHEMA_TABLE
, reg_ext
, 0);
859 my_delete(buf
, MYF(0));
863 Update the defines below to reflect the table schema
865 end
= strmov(buf
, "CREATE TABLE IF NOT EXISTS "
866 NDB_REP_DB
"." NDB_SCHEMA_TABLE
867 " ( db VARBINARY(63) NOT NULL,"
868 " name VARBINARY(63) NOT NULL,"
869 " slock BINARY(32) NOT NULL,"
870 " query BLOB NOT NULL,"
871 " node_id INT UNSIGNED NOT NULL,"
872 " epoch BIGINT UNSIGNED NOT NULL,"
873 " id INT UNSIGNED NOT NULL,"
874 " version INT UNSIGNED NOT NULL,"
875 " type INT UNSIGNED NOT NULL,"
876 " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
878 const int no_print_error
[6]= {ER_TABLE_EXISTS_ERROR
,
881 721, // Table already exist
883 0}; // do not print error 701 etc
884 run_query(thd
, buf
, end
, no_print_error
, TRUE
);
889 int ndbcluster_setup_binlog_table_shares(THD
*thd
)
891 if (!ndb_schema_share
&&
892 ndbcluster_check_ndb_schema_share() == 0)
894 pthread_mutex_lock(&LOCK_open
);
895 ndb_create_table_from_engine(thd
, NDB_REP_DB
, NDB_SCHEMA_TABLE
);
896 pthread_mutex_unlock(&LOCK_open
);
897 if (!ndb_schema_share
)
899 ndbcluster_create_schema_table(thd
);
900 // always make sure we create the 'schema' first
901 if (!ndb_schema_share
)
905 if (!ndb_apply_status_share
&&
906 ndbcluster_check_ndb_apply_status_share() == 0)
908 pthread_mutex_lock(&LOCK_open
);
909 ndb_create_table_from_engine(thd
, NDB_REP_DB
, NDB_APPLY_TABLE
);
910 pthread_mutex_unlock(&LOCK_open
);
911 if (!ndb_apply_status_share
)
913 ndbcluster_create_ndb_apply_status_table(thd
);
914 if (!ndb_apply_status_share
)
918 if (!ndbcluster_find_all_files(thd
))
920 pthread_mutex_lock(&LOCK_open
);
921 ndb_binlog_tables_inited
= TRUE
;
922 if (ndb_extra_logging
)
923 sql_print_information("NDB Binlog: ndb tables writable");
924 close_cached_tables(NULL
, NULL
, TRUE
, FALSE
, FALSE
);
925 pthread_mutex_unlock(&LOCK_open
);
926 /* Signal injector thread that all is setup */
927 pthread_cond_signal(&injector_cond
);
933 Defines and struct for schema table.
934 Should reflect table definition above.
936 #define SCHEMA_DB_I 0u
937 #define SCHEMA_NAME_I 1u
938 #define SCHEMA_SLOCK_I 2u
939 #define SCHEMA_QUERY_I 3u
940 #define SCHEMA_NODE_ID_I 4u
941 #define SCHEMA_EPOCH_I 5u
942 #define SCHEMA_ID_I 6u
943 #define SCHEMA_VERSION_I 7u
944 #define SCHEMA_TYPE_I 8u
945 #define SCHEMA_SIZE 9u
946 #define SCHEMA_SLOCK_SIZE 32u
948 struct Cluster_schema
955 uint32 slock
[SCHEMA_SLOCK_SIZE
/4];
956 unsigned short query_length
;
967 Transfer schema table data into corresponding struct
969 static void ndbcluster_get_schema(NDB_SHARE
*share
,
972 TABLE
*table
= share
->table
;
974 /* unpack blob values */
975 uchar
* blobs_buffer
= 0;
976 uint blobs_buffer_size
= 0;
977 my_bitmap_map
*old_map
= dbug_tmp_use_all_columns(table
, table
->read_set
);
979 ptrdiff_t ptrdiff
= 0;
980 int ret
= get_ndb_blobs_value(table
, share
->ndb_value
[0],
981 blobs_buffer
, blobs_buffer_size
,
985 my_free(blobs_buffer
, MYF(MY_ALLOW_ZERO_PTR
));
986 DBUG_PRINT("info", ("blob read error"));
990 /* db varchar 1 length uchar */
992 s
->db_length
= *(uint8
*)(*field
)->ptr
;
993 DBUG_ASSERT(s
->db_length
<= (*field
)->field_length
);
994 DBUG_ASSERT((*field
)->field_length
+ 1 == sizeof(s
->db
));
995 memcpy(s
->db
, (*field
)->ptr
+ 1, s
->db_length
);
996 s
->db
[s
->db_length
]= 0;
997 /* name varchar 1 length uchar */
999 s
->name_length
= *(uint8
*)(*field
)->ptr
;
1000 DBUG_ASSERT(s
->name_length
<= (*field
)->field_length
);
1001 DBUG_ASSERT((*field
)->field_length
+ 1 == sizeof(s
->name
));
1002 memcpy(s
->name
, (*field
)->ptr
+ 1, s
->name_length
);
1003 s
->name
[s
->name_length
]= 0;
1004 /* slock fixed length */
1006 s
->slock_length
= (*field
)->field_length
;
1007 DBUG_ASSERT((*field
)->field_length
== sizeof(s
->slock
));
1008 memcpy(s
->slock
, (*field
)->ptr
, s
->slock_length
);
1012 Field_blob
*field_blob
= (Field_blob
*)(*field
);
1013 uint blob_len
= field_blob
->get_length((*field
)->ptr
);
1015 field_blob
->get_ptr(&blob_ptr
);
1016 DBUG_ASSERT(blob_len
== 0 || blob_ptr
!= 0);
1017 s
->query_length
= blob_len
;
1018 s
->query
= sql_strmake((char*) blob_ptr
, blob_len
);
1022 s
->node_id
= ((Field_long
*)*field
)->val_int();
1025 s
->epoch
= ((Field_long
*)*field
)->val_int();
1028 s
->id
= ((Field_long
*)*field
)->val_int();
1031 s
->version
= ((Field_long
*)*field
)->val_int();
1034 s
->type
= ((Field_long
*)*field
)->val_int();
1035 /* free blobs buffer */
1036 my_free(blobs_buffer
, MYF(MY_ALLOW_ZERO_PTR
));
1037 dbug_tmp_restore_column_map(table
->read_set
, old_map
);
1041 helper function to pack a ndb varchar
1043 char *ndb_pack_varchar(const NDBCOL
*col
, char *buf
,
1044 const char *str
, int sz
)
1046 switch (col
->getArrayType())
1048 case NDBCOL::ArrayTypeFixed
:
1049 memcpy(buf
, str
, sz
);
1051 case NDBCOL::ArrayTypeShortVar
:
1052 *(uchar
*)buf
= (uchar
)sz
;
1053 memcpy(buf
+ 1, str
, sz
);
1055 case NDBCOL::ArrayTypeMediumVar
:
1057 memcpy(buf
+ 2, str
, sz
);
1064 acknowledge handling of schema operation
1067 ndbcluster_update_slock(THD
*thd
,
1069 const char *table_name
)
1071 DBUG_ENTER("ndbcluster_update_slock");
1072 if (!ndb_schema_share
)
1077 const NdbError
*ndb_error
= 0;
1078 uint32 node_id
= g_ndb_cluster_connection
->node_id();
1079 Ndb
*ndb
= check_ndb_in_thd(thd
);
1080 char save_db
[FN_HEADLEN
];
1081 strcpy(save_db
, ndb
->getDatabaseName());
1083 char tmp_buf
[FN_REFLEN
];
1084 NDBDICT
*dict
= ndb
->getDictionary();
1085 ndb
->setDatabaseName(NDB_REP_DB
);
1086 Ndb_table_guard
ndbtab_g(dict
, NDB_SCHEMA_TABLE
);
1087 const NDBTAB
*ndbtab
= ndbtab_g
.get_table();
1088 NdbTransaction
*trans
= 0;
1090 int retry_sleep
= 10; /* 10 milliseconds, transaction */
1091 const NDBCOL
*col
[SCHEMA_SIZE
];
1092 unsigned sz
[SCHEMA_SIZE
];
1095 uint32 bitbuf
[SCHEMA_SLOCK_SIZE
/4];
1096 bitmap_init(&slock
, bitbuf
, sizeof(bitbuf
)*8, false);
1106 for (i
= 0; i
< SCHEMA_SIZE
; i
++)
1108 col
[i
]= ndbtab
->getColumn(i
);
1109 if (i
!= SCHEMA_QUERY_I
)
1111 sz
[i
]= col
[i
]->getLength();
1112 DBUG_ASSERT(sz
[i
] <= sizeof(tmp_buf
));
1119 if ((trans
= ndb
->startTransaction()) == 0)
1122 NdbOperation
*op
= 0;
1125 /* read the bitmap exlusive */
1126 r
|= (op
= trans
->getNdbOperation(ndbtab
)) == 0;
1127 DBUG_ASSERT(r
== 0);
1128 r
|= op
->readTupleExclusive();
1129 DBUG_ASSERT(r
== 0);
1132 ndb_pack_varchar(col
[SCHEMA_DB_I
], tmp_buf
, db
, strlen(db
));
1133 r
|= op
->equal(SCHEMA_DB_I
, tmp_buf
);
1134 DBUG_ASSERT(r
== 0);
1136 ndb_pack_varchar(col
[SCHEMA_NAME_I
], tmp_buf
, table_name
,
1137 strlen(table_name
));
1138 r
|= op
->equal(SCHEMA_NAME_I
, tmp_buf
);
1139 DBUG_ASSERT(r
== 0);
1141 r
|= op
->getValue(SCHEMA_SLOCK_I
, (char*)slock
.bitmap
) == 0;
1142 DBUG_ASSERT(r
== 0);
1144 if (trans
->execute(NdbTransaction::NoCommit
))
1146 bitmap_clear_bit(&slock
, node_id
);
1148 NdbOperation
*op
= 0;
1151 /* now update the tuple */
1152 r
|= (op
= trans
->getNdbOperation(ndbtab
)) == 0;
1153 DBUG_ASSERT(r
== 0);
1154 r
|= op
->updateTuple();
1155 DBUG_ASSERT(r
== 0);
1158 ndb_pack_varchar(col
[SCHEMA_DB_I
], tmp_buf
, db
, strlen(db
));
1159 r
|= op
->equal(SCHEMA_DB_I
, tmp_buf
);
1160 DBUG_ASSERT(r
== 0);
1162 ndb_pack_varchar(col
[SCHEMA_NAME_I
], tmp_buf
, table_name
,
1163 strlen(table_name
));
1164 r
|= op
->equal(SCHEMA_NAME_I
, tmp_buf
);
1165 DBUG_ASSERT(r
== 0);
1167 r
|= op
->setValue(SCHEMA_SLOCK_I
, (char*)slock
.bitmap
);
1168 DBUG_ASSERT(r
== 0);
1170 r
|= op
->setValue(SCHEMA_NODE_ID_I
, node_id
);
1171 DBUG_ASSERT(r
== 0);
1173 r
|= op
->setValue(SCHEMA_TYPE_I
, (uint32
)SOT_CLEAR_SLOCK
);
1174 DBUG_ASSERT(r
== 0);
1176 if (trans
->execute(NdbTransaction::Commit
) == 0)
1178 dict
->forceGCPWait();
1179 DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1180 node_id
, db
, table_name
));
1184 const NdbError
*this_error
= trans
?
1185 &trans
->getNdbError() : &ndb
->getNdbError();
1186 if (this_error
->status
== NdbError::TemporaryError
)
1191 ndb
->closeTransaction(trans
);
1192 my_sleep(retry_sleep
);
1196 ndb_error
= this_error
;
1203 my_snprintf(buf
, sizeof(buf
), "Could not release lock on '%s.%s'",
1205 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
1206 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
1207 ndb_error
->code
, ndb_error
->message
, buf
);
1210 ndb
->closeTransaction(trans
);
1211 ndb
->setDatabaseName(save_db
);
1216 log query in schema table
1218 static void ndb_report_waiting(const char *key
,
1223 ulonglong ndb_latest_epoch
= 0;
1224 const char *proc_info
= "<no info>";
1225 pthread_mutex_lock(&injector_mutex
);
1227 ndb_latest_epoch
= injector_ndb
->getLatestGCI();
1229 proc_info
= injector_thd
->proc_info
;
1230 pthread_mutex_unlock(&injector_mutex
);
1231 sql_print_information("NDB %s:"
1232 " waiting max %u sec for %s %s."
1233 " epochs: (%u,%u,%u)"
1234 " injector proc_info: %s"
1235 ,key
, the_time
, op
, obj
1236 ,(uint
)ndb_latest_handled_binlog_epoch
1237 ,(uint
)ndb_latest_received_binlog_epoch
1238 ,(uint
)ndb_latest_epoch
1243 int ndbcluster_log_schema_op(THD
*thd
, NDB_SHARE
*share
,
1244 const char *query
, int query_length
,
1245 const char *db
, const char *table_name
,
1246 uint32 ndb_table_id
,
1247 uint32 ndb_table_version
,
1248 enum SCHEMA_OP_TYPE type
,
1249 const char *new_db
, const char *new_table_name
,
1252 DBUG_ENTER("ndbcluster_log_schema_op");
1253 Thd_ndb
*thd_ndb
= get_thd_ndb(thd
);
1256 if (!(thd_ndb
= ha_ndbcluster::seize_thd_ndb()))
1258 sql_print_error("Could not allocate Thd_ndb object");
1261 set_thd_ndb(thd
, thd_ndb
);
1265 ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1266 query
, db
, table_name
, thd_ndb
->options
));
1267 if (!ndb_schema_share
|| thd_ndb
->options
& TNO_NO_LOG_SCHEMA_OP
)
1272 char tmp_buf2
[FN_REFLEN
];
1273 char quoted_table1
[2 + 2 * FN_REFLEN
+ 1];
1274 char quoted_db1
[2 + 2 * FN_REFLEN
+ 1];
1275 char quoted_db2
[2 + 2 * FN_REFLEN
+ 1];
1276 char quoted_table2
[2 + 2 * FN_REFLEN
+ 1];
1278 const char *type_str
;
1281 case SOT_DROP_TABLE
:
1282 /* drop database command, do not log at drop table */
1283 if (thd
->lex
->sql_command
== SQLCOM_DROP_DB
)
1285 /* redo the drop table query as is may contain several tables */
1287 id_length
= my_strmov_quoted_identifier (thd
, (char *) quoted_table1
,
1289 quoted_table1
[id_length
]= '\0';
1290 query_length
= (uint
) (strxmov(tmp_buf2
, "drop table ",
1291 quoted_table1
, NullS
) - tmp_buf2
);
1292 type_str
= "drop table";
1294 case SOT_RENAME_TABLE
:
1295 /* redo the rename table query as is may contain several tables */
1297 id_length
= my_strmov_quoted_identifier (thd
, (char *) quoted_db1
,
1299 quoted_db1
[id_length
]= '\0';
1300 id_length
= my_strmov_quoted_identifier (thd
, (char *) quoted_table1
,
1302 quoted_table1
[id_length
]= '\0';
1303 id_length
= my_strmov_quoted_identifier (thd
, (char *) quoted_db2
,
1305 quoted_db2
[id_length
]= '\0';
1306 id_length
= my_strmov_quoted_identifier (thd
, (char *) quoted_table2
,
1308 quoted_table2
[id_length
]= '\0';
1309 query_length
= (uint
) (strxmov(tmp_buf2
, "rename table ",
1310 quoted_db1
, ".", quoted_table1
, " to ",
1311 quoted_db2
, ".", quoted_table2
, NullS
) - tmp_buf2
);
1312 type_str
= "rename table";
1314 case SOT_CREATE_TABLE
:
1315 type_str
= "create table";
1317 case SOT_ALTER_TABLE
:
1318 type_str
= "alter table";
1321 type_str
= "drop db";
1324 type_str
= "create db";
1327 type_str
= "alter db";
1329 case SOT_TABLESPACE
:
1330 type_str
= "tablespace";
1332 case SOT_LOGFILE_GROUP
:
1333 type_str
= "logfile group";
1335 case SOT_TRUNCATE_TABLE
:
1336 type_str
= "truncate table";
1339 abort(); /* should not happen, programming error */
1342 NDB_SCHEMA_OBJECT
*ndb_schema_object
;
1344 char key
[FN_REFLEN
+ 1];
1345 build_table_filename(key
, sizeof(key
) - 1, db
, table_name
, "", 0);
1346 ndb_schema_object
= ndb_get_schema_object(key
, TRUE
, FALSE
);
1349 const NdbError
*ndb_error
= 0;
1350 uint32 node_id
= g_ndb_cluster_connection
->node_id();
1352 MY_BITMAP schema_subscribers
;
1353 uint32 bitbuf
[sizeof(ndb_schema_object
->slock
)/4];
1354 char bitbuf_e
[sizeof(bitbuf
)];
1355 bzero(bitbuf_e
, sizeof(bitbuf_e
));
1358 int no_storage_nodes
= g_ndb_cluster_connection
->no_db_nodes();
1359 bitmap_init(&schema_subscribers
, bitbuf
, sizeof(bitbuf
)*8, FALSE
);
1360 bitmap_set_all(&schema_subscribers
);
1362 /* begin protect ndb_schema_share */
1363 pthread_mutex_lock(&ndb_schema_share_mutex
);
1364 if (ndb_schema_share
== 0)
1366 pthread_mutex_unlock(&ndb_schema_share_mutex
);
1367 if (ndb_schema_object
)
1368 ndb_free_schema_object(&ndb_schema_object
, FALSE
);
1371 (void) pthread_mutex_lock(&ndb_schema_share
->mutex
);
1372 for (i
= 0; i
< no_storage_nodes
; i
++)
1374 MY_BITMAP
*table_subscribers
= &ndb_schema_share
->subscriber_bitmap
[i
];
1375 if (!bitmap_is_clear_all(table_subscribers
))
1377 bitmap_intersect(&schema_subscribers
,
1382 (void) pthread_mutex_unlock(&ndb_schema_share
->mutex
);
1383 pthread_mutex_unlock(&ndb_schema_share_mutex
);
1384 /* end protect ndb_schema_share */
1388 bitmap_clear_bit(&schema_subscribers
, node_id
);
1390 if setting own acknowledge bit it is important that
1391 no other mysqld's are registred, as subsequent code
1392 will cause the original event to be hidden (by blob
1395 if (bitmap_is_clear_all(&schema_subscribers
))
1396 bitmap_set_bit(&schema_subscribers
, node_id
);
1399 bitmap_clear_all(&schema_subscribers
);
1401 if (ndb_schema_object
)
1403 (void) pthread_mutex_lock(&ndb_schema_object
->mutex
);
1404 memcpy(ndb_schema_object
->slock
, schema_subscribers
.bitmap
,
1405 sizeof(ndb_schema_object
->slock
));
1406 (void) pthread_mutex_unlock(&ndb_schema_object
->mutex
);
1409 DBUG_DUMP("schema_subscribers", (uchar
*)schema_subscribers
.bitmap
,
1410 no_bytes_in_map(&schema_subscribers
));
1411 DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
1412 bitmap_is_clear_all(&schema_subscribers
)));
1415 Ndb
*ndb
= thd_ndb
->ndb
;
1416 char save_db
[FN_REFLEN
];
1417 strcpy(save_db
, ndb
->getDatabaseName());
1419 char tmp_buf
[FN_REFLEN
];
1420 NDBDICT
*dict
= ndb
->getDictionary();
1421 ndb
->setDatabaseName(NDB_REP_DB
);
1422 Ndb_table_guard
ndbtab_g(dict
, NDB_SCHEMA_TABLE
);
1423 const NDBTAB
*ndbtab
= ndbtab_g
.get_table();
1424 NdbTransaction
*trans
= 0;
1426 int retry_sleep
= 10; /* 10 milliseconds, transaction */
1427 const NDBCOL
*col
[SCHEMA_SIZE
];
1428 unsigned sz
[SCHEMA_SIZE
];
1432 if (strcmp(NDB_REP_DB
, db
) != 0 ||
1433 strcmp(NDB_SCHEMA_TABLE
, table_name
))
1435 ndb_error
= &dict
->getNdbError();
1442 for (i
= 0; i
< SCHEMA_SIZE
; i
++)
1444 col
[i
]= ndbtab
->getColumn(i
);
1445 if (i
!= SCHEMA_QUERY_I
)
1447 sz
[i
]= col
[i
]->getLength();
1448 DBUG_ASSERT(sz
[i
] <= sizeof(tmp_buf
));
1455 const char *log_db
= db
;
1456 const char *log_tab
= table_name
;
1457 const char *log_subscribers
= (char*)schema_subscribers
.bitmap
;
1458 uint32 log_type
= (uint32
)type
;
1459 if ((trans
= ndb
->startTransaction()) == 0)
1463 NdbOperation
*op
= 0;
1465 r
|= (op
= trans
->getNdbOperation(ndbtab
)) == 0;
1466 DBUG_ASSERT(r
== 0);
1467 r
|= op
->writeTuple();
1468 DBUG_ASSERT(r
== 0);
1471 ndb_pack_varchar(col
[SCHEMA_DB_I
], tmp_buf
, log_db
, strlen(log_db
));
1472 r
|= op
->equal(SCHEMA_DB_I
, tmp_buf
);
1473 DBUG_ASSERT(r
== 0);
1475 ndb_pack_varchar(col
[SCHEMA_NAME_I
], tmp_buf
, log_tab
,
1477 r
|= op
->equal(SCHEMA_NAME_I
, tmp_buf
);
1478 DBUG_ASSERT(r
== 0);
1480 DBUG_ASSERT(sz
[SCHEMA_SLOCK_I
] == sizeof(bitbuf
));
1481 r
|= op
->setValue(SCHEMA_SLOCK_I
, log_subscribers
);
1482 DBUG_ASSERT(r
== 0);
1485 NdbBlob
*ndb_blob
= op
->getBlobHandle(SCHEMA_QUERY_I
);
1486 DBUG_ASSERT(ndb_blob
!= 0);
1487 uint blob_len
= query_length
;
1488 const char* blob_ptr
= query
;
1489 r
|= ndb_blob
->setValue(blob_ptr
, blob_len
);
1490 DBUG_ASSERT(r
== 0);
1493 r
|= op
->setValue(SCHEMA_NODE_ID_I
, node_id
);
1494 DBUG_ASSERT(r
== 0);
1496 r
|= op
->setValue(SCHEMA_EPOCH_I
, epoch
);
1497 DBUG_ASSERT(r
== 0);
1499 r
|= op
->setValue(SCHEMA_ID_I
, ndb_table_id
);
1500 DBUG_ASSERT(r
== 0);
1502 r
|= op
->setValue(SCHEMA_VERSION_I
, ndb_table_version
);
1503 DBUG_ASSERT(r
== 0);
1505 r
|= op
->setValue(SCHEMA_TYPE_I
, log_type
);
1506 DBUG_ASSERT(r
== 0);
1508 if (!(thd
->options
& OPTION_BIN_LOG
))
1509 r
|= op
->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING
);
1511 r
|= op
->setAnyValue(thd
->server_id
);
1512 DBUG_ASSERT(r
== 0);
1513 if (log_db
!= new_db
&& new_db
&& new_table_name
)
1516 log_tab
= new_table_name
;
1517 log_subscribers
= bitbuf_e
; // no ack expected on this
1518 log_type
= (uint32
)SOT_RENAME_TABLE_NEW
;
1523 if (trans
->execute(NdbTransaction::Commit
) == 0)
1525 DBUG_PRINT("info", ("logged: %s", query
));
1529 const NdbError
*this_error
= trans
?
1530 &trans
->getNdbError() : &ndb
->getNdbError();
1531 if (this_error
->status
== NdbError::TemporaryError
)
1536 ndb
->closeTransaction(trans
);
1537 my_sleep(retry_sleep
);
1541 ndb_error
= this_error
;
1546 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
1547 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
1550 "Could not log query '%s' on other mysqld's");
1553 ndb
->closeTransaction(trans
);
1554 ndb
->setDatabaseName(save_db
);
1557 Wait for other mysqld's to acknowledge the table operation
1559 if (ndb_error
== 0 &&
1560 !bitmap_is_clear_all(&schema_subscribers
))
1563 if own nodeid is set we are a single mysqld registred
1564 as an optimization we update the slock directly
1566 if (bitmap_is_set(&schema_subscribers
, node_id
))
1567 ndbcluster_update_slock(thd
, db
, table_name
);
1569 dict
->forceGCPWait();
1571 int max_timeout
= opt_ndb_sync_timeout
;
1572 (void) pthread_mutex_lock(&ndb_schema_object
->mutex
);
1575 safe_mutex_assert_owner(&LOCK_open
);
1576 (void) pthread_mutex_unlock(&LOCK_open
);
1580 struct timespec abstime
;
1582 int no_storage_nodes
= g_ndb_cluster_connection
->no_db_nodes();
1583 set_timespec(abstime
, 1);
1584 int ret
= pthread_cond_timedwait(&injector_cond
,
1585 &ndb_schema_object
->mutex
,
1590 /* begin protect ndb_schema_share */
1591 pthread_mutex_lock(&ndb_schema_share_mutex
);
1592 if (ndb_schema_share
== 0)
1594 pthread_mutex_unlock(&ndb_schema_share_mutex
);
1597 (void) pthread_mutex_lock(&ndb_schema_share
->mutex
);
1598 for (i
= 0; i
< no_storage_nodes
; i
++)
1600 /* remove any unsubscribed from schema_subscribers */
1601 MY_BITMAP
*tmp
= &ndb_schema_share
->subscriber_bitmap
[i
];
1602 if (!bitmap_is_clear_all(tmp
))
1603 bitmap_intersect(&schema_subscribers
, tmp
);
1605 (void) pthread_mutex_unlock(&ndb_schema_share
->mutex
);
1606 pthread_mutex_unlock(&ndb_schema_share_mutex
);
1607 /* end protect ndb_schema_share */
1609 /* remove any unsubscribed from ndb_schema_object->slock */
1610 bitmap_intersect(&ndb_schema_object
->slock_bitmap
, &schema_subscribers
);
1612 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
1613 (uchar
*)ndb_schema_object
->slock_bitmap
.bitmap
,
1614 no_bytes_in_map(&ndb_schema_object
->slock_bitmap
));
1616 if (bitmap_is_clear_all(&ndb_schema_object
->slock_bitmap
))
1622 if (max_timeout
== 0)
1624 sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
1625 type_str
, ndb_schema_object
->key
);
1628 if (ndb_extra_logging
)
1629 ndb_report_waiting(type_str
, max_timeout
,
1630 "distributing", ndb_schema_object
->key
);
1635 (void) pthread_mutex_lock(&LOCK_open
);
1637 (void) pthread_mutex_unlock(&ndb_schema_object
->mutex
);
1640 if (ndb_schema_object
)
1641 ndb_free_schema_object(&ndb_schema_object
, FALSE
);
1647 Handle _non_ data events from the storage nodes
1650 ndb_handle_schema_change(THD
*thd
, Ndb
*ndb
, NdbEventOperation
*pOp
,
1653 DBUG_ENTER("ndb_handle_schema_change");
1654 TABLE
* table
= share
->table
;
1655 TABLE_SHARE
*table_share
= share
->table_share
;
1656 const char *dbname
= table_share
->db
.str
;
1657 const char *tabname
= table_share
->table_name
.str
;
1658 bool do_close_cached_tables
= FALSE
;
1659 bool is_online_alter_table
= FALSE
;
1660 bool is_rename_table
= FALSE
;
1661 bool is_remote_change
=
1662 (uint
) pOp
->getReqNodeId() != g_ndb_cluster_connection
->node_id();
1664 if (pOp
->getEventType() == NDBEVENT::TE_ALTER
)
1666 if (pOp
->tableFrmChanged())
1668 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
1669 is_online_alter_table
= TRUE
;
1673 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
1674 DBUG_ASSERT(pOp
->tableNameChanged());
1675 is_rename_table
= TRUE
;
1680 ndb
->setDatabaseName(dbname
);
1681 Ndb_table_guard
ndbtab_g(ndb
->getDictionary(), tabname
);
1682 const NDBTAB
*ev_tab
= pOp
->getTable();
1683 const NDBTAB
*cache_tab
= ndbtab_g
.get_table();
1685 cache_tab
->getObjectId() == ev_tab
->getObjectId() &&
1686 cache_tab
->getObjectVersion() <= ev_tab
->getObjectVersion())
1687 ndbtab_g
.invalidate();
1691 Refresh local frm file and dictionary cache if
1692 remote on-line alter table
1694 if (is_remote_change
&& is_online_alter_table
)
1696 const char *tabname
= table_share
->table_name
.str
;
1697 char key
[FN_REFLEN
+ 1];
1698 uchar
*data
= 0, *pack_data
= 0;
1699 size_t length
, pack_length
;
1701 NDBDICT
*dict
= ndb
->getDictionary();
1702 const NDBTAB
*altered_table
= pOp
->getTable();
1704 DBUG_PRINT("info", ("Detected frm change of table %s.%s",
1706 build_table_filename(key
, FN_LEN
- 1, dbname
, tabname
, NullS
, 0);
1708 If the there is no local table shadowing the altered table and
1709 it has an frm that is different than the one on disk then
1710 overwrite it with the new table definition
1712 if (!ndbcluster_check_if_local_table(dbname
, tabname
) &&
1713 readfrm(key
, &data
, &length
) == 0 &&
1714 packfrm(data
, length
, &pack_data
, &pack_length
) == 0 &&
1715 cmp_frm(altered_table
, pack_data
, pack_length
))
1717 DBUG_DUMP("frm", (uchar
*) altered_table
->getFrmData(),
1718 altered_table
->getFrmLength());
1719 pthread_mutex_lock(&LOCK_open
);
1720 Ndb_table_guard
ndbtab_g(dict
, tabname
);
1721 const NDBTAB
*old
= ndbtab_g
.get_table();
1723 old
->getObjectVersion() != altered_table
->getObjectVersion())
1724 dict
->putTable(altered_table
);
1726 my_free((char*)data
, MYF(MY_ALLOW_ZERO_PTR
));
1728 if ((error
= unpackfrm(&data
, &length
,
1729 (const uchar
*) altered_table
->getFrmData())) ||
1730 (error
= writefrm(key
, data
, length
)))
1732 sql_print_information("NDB: Failed write frm for %s.%s, error %d",
1733 dbname
, tabname
, error
);
1736 // copy names as memory will be freed
1737 NdbAutoPtr
<char> a1((char *)(dbname
= strdup(dbname
)));
1738 NdbAutoPtr
<char> a2((char *)(tabname
= strdup(tabname
)));
1739 ndbcluster_binlog_close_table(thd
, share
);
1741 TABLE_LIST table_list
;
1742 bzero((char*) &table_list
,sizeof(table_list
));
1743 table_list
.db
= (char *)dbname
;
1744 table_list
.alias
= table_list
.table_name
= (char *)tabname
;
1745 close_cached_tables(thd
, &table_list
, TRUE
, FALSE
, FALSE
);
1747 if ((error
= ndbcluster_binlog_open_table(thd
, share
,
1748 table_share
, table
, 1)))
1749 sql_print_information("NDB: Failed to re-open table %s.%s",
1752 table
= share
->table
;
1753 table_share
= share
->table_share
;
1754 dbname
= table_share
->db
.str
;
1755 tabname
= table_share
->table_name
.str
;
1757 pthread_mutex_unlock(&LOCK_open
);
1759 my_free((char*)data
, MYF(MY_ALLOW_ZERO_PTR
));
1760 my_free((char*)pack_data
, MYF(MY_ALLOW_ZERO_PTR
));
1763 // If only frm was changed continue replicating
1764 if (is_online_alter_table
)
1766 /* Signal ha_ndbcluster::alter_table that drop is done */
1767 (void) pthread_cond_signal(&injector_cond
);
1771 (void) pthread_mutex_lock(&share
->mutex
);
1772 if (is_rename_table
&& !is_remote_change
)
1774 DBUG_PRINT("info", ("Detected name change of table %s.%s",
1775 share
->db
, share
->table_name
));
1776 /* ToDo: remove printout */
1777 if (ndb_extra_logging
)
1778 sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
1779 share_prefix
, share
->table
->s
->db
.str
,
1780 share
->table
->s
->table_name
.str
,
1783 ndb
->setDatabaseName(share
->table
->s
->db
.str
);
1784 Ndb_table_guard
ndbtab_g(ndb
->getDictionary(),
1785 share
->table
->s
->table_name
.str
);
1786 const NDBTAB
*ev_tab
= pOp
->getTable();
1787 const NDBTAB
*cache_tab
= ndbtab_g
.get_table();
1789 cache_tab
->getObjectId() == ev_tab
->getObjectId() &&
1790 cache_tab
->getObjectVersion() <= ev_tab
->getObjectVersion())
1791 ndbtab_g
.invalidate();
1793 /* do the rename of the table in the share */
1794 share
->table
->s
->db
.str
= share
->db
;
1795 share
->table
->s
->db
.length
= strlen(share
->db
);
1796 share
->table
->s
->table_name
.str
= share
->table_name
;
1797 share
->table
->s
->table_name
.length
= strlen(share
->table_name
);
1799 DBUG_ASSERT(share
->op
== pOp
|| share
->op_old
== pOp
);
1800 if (share
->op_old
== pOp
)
1804 // either just us or drop table handling as well
1806 /* Signal ha_ndbcluster::delete/rename_table that drop is done */
1807 (void) pthread_mutex_unlock(&share
->mutex
);
1808 (void) pthread_cond_signal(&injector_cond
);
1810 pthread_mutex_lock(&ndbcluster_mutex
);
1811 /* ndb_share reference binlog free */
1812 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
1813 share
->key
, share
->use_count
));
1814 free_share(&share
, TRUE
);
1815 if (is_remote_change
&& share
&& share
->state
!= NSS_DROPPED
)
1817 DBUG_PRINT("info", ("remote change"));
1818 share
->state
= NSS_DROPPED
;
1819 if (share
->use_count
!= 1)
1821 /* open handler holding reference */
1822 /* wait with freeing create ndb_share to below */
1823 do_close_cached_tables
= TRUE
;
1827 /* ndb_share reference create free */
1828 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1829 share
->key
, share
->use_count
));
1830 free_share(&share
, TRUE
);
1836 pthread_mutex_unlock(&ndbcluster_mutex
);
1838 pOp
->setCustomData(0);
1840 pthread_mutex_lock(&injector_mutex
);
1841 ndb
->dropEventOperation(pOp
);
1843 pthread_mutex_unlock(&injector_mutex
);
1845 if (do_close_cached_tables
)
1847 TABLE_LIST table_list
;
1848 bzero((char*) &table_list
,sizeof(table_list
));
1849 table_list
.db
= (char *)dbname
;
1850 table_list
.alias
= table_list
.table_name
= (char *)tabname
;
1851 close_cached_tables(thd
, &table_list
, FALSE
, FALSE
, FALSE
);
1852 /* ndb_share reference create free */
1853 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1854 share
->key
, share
->use_count
));
1860 static void ndb_binlog_query(THD
*thd
, Cluster_schema
*schema
)
1862 if (schema
->any_value
& NDB_ANYVALUE_RESERVED
)
1864 if (schema
->any_value
!= NDB_ANYVALUE_FOR_NOLOGGING
)
1865 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
1870 uint32 thd_server_id_save
= thd
->server_id
;
1871 DBUG_ASSERT(sizeof(thd_server_id_save
) == sizeof(thd
->server_id
));
1872 char *thd_db_save
= thd
->db
;
1873 if (schema
->any_value
== 0)
1874 thd
->server_id
= ::server_id
;
1876 thd
->server_id
= schema
->any_value
;
1877 thd
->db
= schema
->db
;
1878 int errcode
= query_error_code(thd
, thd
->killed
== THD::NOT_KILLED
);
1879 thd
->binlog_query(THD::STMT_QUERY_TYPE
, schema
->query
,
1880 schema
->query_length
, FALSE
,
1881 schema
->name
[0] == 0 || thd
->db
[0] == 0,
1883 thd
->server_id
= thd_server_id_save
;
1884 thd
->db
= thd_db_save
;
1888 ndb_binlog_thread_handle_schema_event(THD
*thd
, Ndb
*ndb
,
1889 NdbEventOperation
*pOp
,
1890 List
<Cluster_schema
>
1891 *post_epoch_log_list
,
1892 List
<Cluster_schema
>
1893 *post_epoch_unlock_list
,
1896 DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
1897 NDB_SHARE
*tmp_share
= (NDB_SHARE
*)pOp
->getCustomData();
1898 if (tmp_share
&& ndb_schema_share
== tmp_share
)
1900 NDBEVENT::TableEvent ev_type
= pOp
->getEventType();
1901 DBUG_PRINT("enter", ("%s.%s ev_type: %d",
1902 tmp_share
->db
, tmp_share
->table_name
, ev_type
));
1903 if (ev_type
== NDBEVENT::TE_UPDATE
||
1904 ev_type
== NDBEVENT::TE_INSERT
)
1906 Cluster_schema
*schema
= (Cluster_schema
*)
1907 sql_alloc(sizeof(Cluster_schema
));
1909 bitmap_init(&slock
, schema
->slock
, 8*SCHEMA_SLOCK_SIZE
, FALSE
);
1910 uint node_id
= g_ndb_cluster_connection
->node_id();
1912 ndbcluster_get_schema(tmp_share
, schema
);
1913 schema
->any_value
= pOp
->getAnyValue();
1915 enum SCHEMA_OP_TYPE schema_type
= (enum SCHEMA_OP_TYPE
)schema
->type
;
1917 ("%s.%s: log query_length: %d query: '%s' type: %d",
1918 schema
->db
, schema
->name
,
1919 schema
->query_length
, schema
->query
,
1921 if (schema_type
== SOT_CLEAR_SLOCK
)
1924 handle slock after epoch is completed to ensure that
1925 schema events get inserted in the binlog after any data
1928 post_epoch_log_list
->push_back(schema
, mem_root
);
1931 if (schema
->node_id
!= node_id
)
1933 int log_query
= 0, post_epoch_unlock
= 0;
1934 switch (schema_type
)
1936 case SOT_DROP_TABLE
:
1938 case SOT_RENAME_TABLE
:
1940 case SOT_RENAME_TABLE_NEW
:
1942 case SOT_ALTER_TABLE
:
1943 post_epoch_log_list
->push_back(schema
, mem_root
);
1944 /* acknowledge this query _after_ epoch completion */
1945 post_epoch_unlock
= 1;
1947 case SOT_TRUNCATE_TABLE
:
1949 char key
[FN_REFLEN
+ 1];
1950 build_table_filename(key
, sizeof(key
) - 1,
1951 schema
->db
, schema
->name
, "", 0);
1952 /* ndb_share reference temporary, free below */
1953 NDB_SHARE
*share
= get_share(key
, 0, FALSE
, FALSE
);
1956 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
1957 share
->key
, share
->use_count
));
1959 // invalidation already handled by binlog thread
1960 if (!share
|| !share
->op
)
1963 injector_ndb
->setDatabaseName(schema
->db
);
1964 Ndb_table_guard
ndbtab_g(injector_ndb
->getDictionary(),
1966 ndbtab_g
.invalidate();
1968 TABLE_LIST table_list
;
1969 bzero((char*) &table_list
,sizeof(table_list
));
1970 table_list
.db
= schema
->db
;
1971 table_list
.alias
= table_list
.table_name
= schema
->name
;
1972 close_cached_tables(thd
, &table_list
, FALSE
, FALSE
, FALSE
);
1974 /* ndb_share reference temporary free */
1977 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
1978 share
->key
, share
->use_count
));
1983 case SOT_CREATE_TABLE
:
1984 pthread_mutex_lock(&LOCK_open
);
1985 if (ndbcluster_check_if_local_table(schema
->db
, schema
->name
))
1987 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
1988 schema
->db
, schema
->name
));
1989 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
1990 "binlog schema event '%s' from node %d. ",
1991 schema
->db
, schema
->name
, schema
->query
,
1994 else if (ndb_create_table_from_engine(thd
, schema
->db
, schema
->name
))
1996 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
1997 "binlog schema event '%s' from node %d. "
1999 schema
->db
, schema
->name
, schema
->query
,
2000 schema
->node_id
, my_errno
);
2001 List_iterator_fast
<MYSQL_ERROR
> it(thd
->warn_list
);
2004 sql_print_warning("NDB Binlog: (%d)%s", err
->code
, err
->msg
);
2006 pthread_mutex_unlock(&LOCK_open
);
2010 /* Drop the database locally if it only contains ndb tables */
2011 if (! ndbcluster_check_if_local_tables_in_db(thd
, schema
->db
))
2013 const int no_print_error
[1]= {0};
2014 run_query(thd
, schema
->query
,
2015 schema
->query
+ schema
->query_length
,
2016 no_print_error
, /* print error */
2017 TRUE
); /* don't binlog the query */
2018 /* binlog dropping database after any table operations */
2019 post_epoch_log_list
->push_back(schema
, mem_root
);
2020 /* acknowledge this query _after_ epoch completion */
2021 post_epoch_unlock
= 1;
2025 /* Database contained local tables, leave it */
2026 sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2027 "binlog schema event '%s' from node %d. ",
2028 schema
->db
, schema
->query
,
2037 const int no_print_error
[1]= {0};
2038 run_query(thd
, schema
->query
,
2039 schema
->query
+ schema
->query_length
,
2040 no_print_error
, /* print error */
2041 TRUE
); /* don't binlog the query */
2045 case SOT_TABLESPACE
:
2046 case SOT_LOGFILE_GROUP
:
2049 case SOT_CLEAR_SLOCK
:
2052 if (log_query
&& ndb_binlog_running
)
2053 ndb_binlog_query(thd
, schema
);
2054 /* signal that schema operation has been handled */
2055 DBUG_DUMP("slock", (uchar
*) schema
->slock
, schema
->slock_length
);
2056 if (bitmap_is_set(&slock
, node_id
))
2058 if (post_epoch_unlock
)
2059 post_epoch_unlock_list
->push_back(schema
, mem_root
);
2061 ndbcluster_update_slock(thd
, schema
->db
, schema
->name
);
2067 the normal case of UPDATE/INSERT has already been handled
2071 case NDBEVENT::TE_DELETE
:
2074 case NDBEVENT::TE_CLUSTER_FAILURE
:
2075 if (ndb_extra_logging
)
2076 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
2077 ndb_schema_share
->key
, (unsigned) pOp
->getGCI());
2079 case NDBEVENT::TE_DROP
:
2080 if (ndb_extra_logging
&&
2081 ndb_binlog_tables_inited
&& ndb_binlog_running
)
2082 sql_print_information("NDB Binlog: ndb tables initially "
2083 "read only on reconnect.");
2085 /* begin protect ndb_schema_share */
2086 pthread_mutex_lock(&ndb_schema_share_mutex
);
2087 /* ndb_share reference binlog extra free */
2088 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
2089 ndb_schema_share
->key
,
2090 ndb_schema_share
->use_count
));
2091 free_share(&ndb_schema_share
);
2092 ndb_schema_share
= 0;
2093 ndb_binlog_tables_inited
= 0;
2094 pthread_mutex_unlock(&ndb_schema_share_mutex
);
2095 /* end protect ndb_schema_share */
2097 close_cached_tables(NULL
, NULL
, FALSE
, FALSE
, FALSE
);
2099 case NDBEVENT::TE_ALTER
:
2100 ndb_handle_schema_change(thd
, ndb
, pOp
, tmp_share
);
2102 case NDBEVENT::TE_NODE_FAILURE
:
2104 uint8 node_id
= g_node_id_map
[pOp
->getNdbdNodeId()];
2105 DBUG_ASSERT(node_id
!= 0xFF);
2106 (void) pthread_mutex_lock(&tmp_share
->mutex
);
2107 bitmap_clear_all(&tmp_share
->subscriber_bitmap
[node_id
]);
2108 DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id
));
2109 if (ndb_extra_logging
)
2111 sql_print_information("NDB Binlog: Node: %d, down,"
2112 " Subscriber bitmask %x%x",
2113 pOp
->getNdbdNodeId(),
2114 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[1],
2115 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[0]);
2117 (void) pthread_mutex_unlock(&tmp_share
->mutex
);
2118 (void) pthread_cond_signal(&injector_cond
);
2121 case NDBEVENT::TE_SUBSCRIBE
:
2123 uint8 node_id
= g_node_id_map
[pOp
->getNdbdNodeId()];
2124 uint8 req_id
= pOp
->getReqNodeId();
2125 DBUG_ASSERT(req_id
!= 0 && node_id
!= 0xFF);
2126 (void) pthread_mutex_lock(&tmp_share
->mutex
);
2127 bitmap_set_bit(&tmp_share
->subscriber_bitmap
[node_id
], req_id
);
2128 DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id
, req_id
));
2129 if (ndb_extra_logging
)
2131 sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2132 " Subscriber bitmask %x%x",
2133 pOp
->getNdbdNodeId(),
2135 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[1],
2136 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[0]);
2138 (void) pthread_mutex_unlock(&tmp_share
->mutex
);
2139 (void) pthread_cond_signal(&injector_cond
);
2142 case NDBEVENT::TE_UNSUBSCRIBE
:
2144 uint8 node_id
= g_node_id_map
[pOp
->getNdbdNodeId()];
2145 uint8 req_id
= pOp
->getReqNodeId();
2146 DBUG_ASSERT(req_id
!= 0 && node_id
!= 0xFF);
2147 (void) pthread_mutex_lock(&tmp_share
->mutex
);
2148 bitmap_clear_bit(&tmp_share
->subscriber_bitmap
[node_id
], req_id
);
2149 DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id
, req_id
));
2150 if (ndb_extra_logging
)
2152 sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2153 " Subscriber bitmask %x%x",
2154 pOp
->getNdbdNodeId(),
2156 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[1],
2157 tmp_share
->subscriber_bitmap
[node_id
].bitmap
[0]);
2159 (void) pthread_mutex_unlock(&tmp_share
->mutex
);
2160 (void) pthread_cond_signal(&injector_cond
);
2164 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2165 "Ignoring...", (unsigned) ev_type
, tmp_share
->key
);
2172 process any operations that should be done after
2173 the epoch is complete
2176 ndb_binlog_thread_handle_schema_event_post_epoch(THD
*thd
,
2177 List
<Cluster_schema
>
2178 *post_epoch_log_list
,
2179 List
<Cluster_schema
>
2180 *post_epoch_unlock_list
)
2182 if (post_epoch_log_list
->elements
== 0)
2184 DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2185 Cluster_schema
*schema
;
2186 while ((schema
= post_epoch_log_list
->pop()))
2189 ("%s.%s: log query_length: %d query: '%s' type: %d",
2190 schema
->db
, schema
->name
,
2191 schema
->query_length
, schema
->query
,
2195 enum SCHEMA_OP_TYPE schema_type
= (enum SCHEMA_OP_TYPE
)schema
->type
;
2196 char key
[FN_REFLEN
+ 1];
2197 build_table_filename(key
, sizeof(key
) - 1, schema
->db
, schema
->name
, "", 0);
2198 if (schema_type
== SOT_CLEAR_SLOCK
)
2200 pthread_mutex_lock(&ndbcluster_mutex
);
2201 NDB_SCHEMA_OBJECT
*ndb_schema_object
=
2202 (NDB_SCHEMA_OBJECT
*) hash_search(&ndb_schema_objects
,
2203 (uchar
*) key
, strlen(key
));
2204 if (ndb_schema_object
)
2206 pthread_mutex_lock(&ndb_schema_object
->mutex
);
2207 memcpy(ndb_schema_object
->slock
, schema
->slock
,
2208 sizeof(ndb_schema_object
->slock
));
2209 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2210 (uchar
*)ndb_schema_object
->slock_bitmap
.bitmap
,
2211 no_bytes_in_map(&ndb_schema_object
->slock_bitmap
));
2212 pthread_mutex_unlock(&ndb_schema_object
->mutex
);
2213 pthread_cond_signal(&injector_cond
);
2215 pthread_mutex_unlock(&ndbcluster_mutex
);
2218 /* ndb_share reference temporary, free below */
2219 NDB_SHARE
*share
= get_share(key
, 0, FALSE
, FALSE
);
2222 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2223 share
->key
, share
->use_count
));
2225 switch (schema_type
)
2230 case SOT_DROP_TABLE
:
2232 // invalidation already handled by binlog thread
2233 if (share
&& share
->op
)
2238 case SOT_RENAME_TABLE
:
2240 case SOT_ALTER_TABLE
:
2241 // invalidation already handled by binlog thread
2242 if (!share
|| !share
->op
)
2245 injector_ndb
->setDatabaseName(schema
->db
);
2246 Ndb_table_guard
ndbtab_g(injector_ndb
->getDictionary(),
2248 ndbtab_g
.invalidate();
2250 TABLE_LIST table_list
;
2251 bzero((char*) &table_list
,sizeof(table_list
));
2252 table_list
.db
= schema
->db
;
2253 table_list
.alias
= table_list
.table_name
= schema
->name
;
2254 close_cached_tables(thd
, &table_list
, FALSE
, FALSE
, FALSE
);
2256 if (schema_type
!= SOT_ALTER_TABLE
)
2259 case SOT_RENAME_TABLE_NEW
:
2261 if (ndb_binlog_running
&& (!share
|| !share
->op
))
2264 we need to free any share here as command below
2265 may need to call handle_trailing_share
2269 /* ndb_share reference temporary free */
2270 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2271 share
->key
, share
->use_count
));
2275 pthread_mutex_lock(&LOCK_open
);
2276 if (ndbcluster_check_if_local_table(schema
->db
, schema
->name
))
2278 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2279 schema
->db
, schema
->name
));
2280 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2281 "binlog schema event '%s' from node %d. ",
2282 schema
->db
, schema
->name
, schema
->query
,
2285 else if (ndb_create_table_from_engine(thd
, schema
->db
, schema
->name
))
2287 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
2288 "binlog schema event '%s' from node %d. my_errno: %d",
2289 schema
->db
, schema
->name
, schema
->query
,
2290 schema
->node_id
, my_errno
);
2291 List_iterator_fast
<MYSQL_ERROR
> it(thd
->warn_list
);
2294 sql_print_warning("NDB Binlog: (%d)%s", err
->code
, err
->msg
);
2296 pthread_mutex_unlock(&LOCK_open
);
2304 /* ndb_share reference temporary free */
2305 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2306 share
->key
, share
->use_count
));
2311 if (ndb_binlog_running
&& log_query
)
2312 ndb_binlog_query(thd
, schema
);
2314 while ((schema
= post_epoch_unlock_list
->pop()))
2316 ndbcluster_update_slock(thd
, schema
->db
, schema
->name
);
2322 Timer class for doing performance measurements
2325 /*********************************************************************
2326 Internal helper functions for handeling of the cluster replication tables
2329 *********************************************************************/
2332 struct to hold the data to be inserted into the
2333 ndb_binlog_index table
2335 struct ndb_binlog_index_row
{
2337 const char *master_log_file
;
2338 ulonglong master_log_pos
;
2339 ulonglong n_inserts
;
2340 ulonglong n_updates
;
2341 ulonglong n_deletes
;
2342 ulonglong n_schemaops
;
2346 Open the ndb_binlog_index table
2348 static int open_ndb_binlog_index(THD
*thd
, TABLE_LIST
*tables
,
2349 TABLE
**ndb_binlog_index
)
2351 static char repdb
[]= NDB_REP_DB
;
2352 static char reptable
[]= NDB_REP_TABLE
;
2353 const char *save_proc_info
= thd
->proc_info
;
2355 bzero((char*) tables
, sizeof(*tables
));
2357 tables
->alias
= tables
->table_name
= reptable
;
2358 tables
->lock_type
= TL_WRITE
;
2359 thd
->proc_info
= "Opening " NDB_REP_DB
"." NDB_REP_TABLE
;
2360 tables
->required_type
= FRMTYPE_TABLE
;
2363 if (open_tables(thd
, &tables
, &counter
, MYSQL_LOCK_IGNORE_FLUSH
))
2366 sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
2368 sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
2369 thd
->main_da
.sql_errno(),
2370 thd
->main_da
.message());
2371 thd
->proc_info
= save_proc_info
;
2374 *ndb_binlog_index
= tables
->table
;
2375 thd
->proc_info
= save_proc_info
;
2376 (*ndb_binlog_index
)->use_all_columns();
2382 Insert one row in the ndb_binlog_index
2385 int ndb_add_ndb_binlog_index(THD
*thd
, void *_row
)
2387 ndb_binlog_index_row
&row
= *(ndb_binlog_index_row
*) _row
;
2391 Turn of binlogging to prevent the table changes to be written to
2394 ulong saved_options
= thd
->options
;
2395 thd
->options
&= ~(OPTION_BIN_LOG
);
2397 for ( ; ; ) /* loop for need_reopen */
2399 if (!ndb_binlog_index
&& open_ndb_binlog_index(thd
, &binlog_tables
, &ndb_binlog_index
))
2402 goto add_ndb_binlog_index_err
;
2405 if (lock_tables(thd
, &binlog_tables
, 1, &need_reopen
))
2409 TABLE_LIST
*p_binlog_tables
= &binlog_tables
;
2410 close_tables_for_reopen(thd
, &p_binlog_tables
);
2411 ndb_binlog_index
= 0;
2414 sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
2416 goto add_ndb_binlog_index_err
;
2422 Intialize ndb_binlog_index->record[0]
2424 empty_record(ndb_binlog_index
);
2426 ndb_binlog_index
->field
[0]->store(row
.master_log_pos
);
2427 ndb_binlog_index
->field
[1]->store(row
.master_log_file
,
2428 strlen(row
.master_log_file
),
2430 ndb_binlog_index
->field
[2]->store(row
.gci
);
2431 ndb_binlog_index
->field
[3]->store(row
.n_inserts
);
2432 ndb_binlog_index
->field
[4]->store(row
.n_updates
);
2433 ndb_binlog_index
->field
[5]->store(row
.n_deletes
);
2434 ndb_binlog_index
->field
[6]->store(row
.n_schemaops
);
2436 if ((error
= ndb_binlog_index
->file
->ha_write_row(ndb_binlog_index
->record
[0])))
2438 sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error
);
2440 goto add_ndb_binlog_index_err
;
2443 mysql_unlock_tables(thd
, thd
->lock
);
2445 thd
->options
= saved_options
;
2447 add_ndb_binlog_index_err
:
2448 close_thread_tables(thd
);
2449 ndb_binlog_index
= 0;
2450 thd
->options
= saved_options
;
2454 /*********************************************************************
2455 Functions for start, stop, wait for ndbcluster binlog thread
2456 *********************************************************************/
2458 enum Binlog_thread_state
2465 static enum Binlog_thread_state do_ndbcluster_binlog_close_connection
= BCCC_restart
;
2467 int ndbcluster_binlog_start()
2469 DBUG_ENTER("ndbcluster_binlog_start");
2471 if (::server_id
== 0)
2473 sql_print_warning("NDB: server id set to zero will cause any other mysqld "
2474 "with bin log to log with wrong server id");
2476 else if (::server_id
& 0x1 << 31)
2478 sql_print_error("NDB: server id's with high bit set is reserved for internal "
2483 pthread_mutex_init(&injector_mutex
, MY_MUTEX_INIT_FAST
);
2484 pthread_cond_init(&injector_cond
, NULL
);
2485 pthread_mutex_init(&ndb_schema_share_mutex
, MY_MUTEX_INIT_FAST
);
2487 /* Create injector thread */
2488 if (pthread_create(&ndb_binlog_thread
, &connection_attrib
,
2489 ndb_binlog_thread_func
, 0))
2491 DBUG_PRINT("error", ("Could not create ndb injector thread"));
2492 pthread_cond_destroy(&injector_cond
);
2493 pthread_mutex_destroy(&injector_mutex
);
2497 ndbcluster_binlog_inited
= 1;
2499 /* Wait for the injector thread to start */
2500 pthread_mutex_lock(&injector_mutex
);
2501 while (!ndb_binlog_thread_running
)
2502 pthread_cond_wait(&injector_cond
, &injector_mutex
);
2503 pthread_mutex_unlock(&injector_mutex
);
2505 if (ndb_binlog_thread_running
< 0)
2512 /**************************************************************
2513 Internal helper functions for creating/dropping ndb events
2514 used by the client sql threads
2515 **************************************************************/
2517 ndb_rep_event_name(String
*event_name
,const char *db
, const char *tbl
)
2519 event_name
->set_ascii("REPL$", 5);
2520 event_name
->append(db
);
2523 event_name
->append('/');
2524 event_name
->append(tbl
);
2529 ndbcluster_check_if_local_table(const char *dbname
, const char *tabname
)
2531 char key
[FN_REFLEN
+ 1];
2532 char ndb_file
[FN_REFLEN
+ 1];
2534 DBUG_ENTER("ndbcluster_check_if_local_table");
2535 build_table_filename(key
, FN_LEN
-1, dbname
, tabname
, reg_ext
, 0);
2536 build_table_filename(ndb_file
, FN_LEN
-1, dbname
, tabname
, ha_ndb_ext
, 0);
2537 /* Check that any defined table is an ndb table */
2538 DBUG_PRINT("info", ("Looking for file %s and %s", key
, ndb_file
));
2539 if ((! my_access(key
, F_OK
)) && my_access(ndb_file
, F_OK
))
2541 DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file
));
2551 ndbcluster_check_if_local_tables_in_db(THD
*thd
, const char *dbname
)
2553 DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
2554 DBUG_PRINT("info", ("Looking for files in directory %s", dbname
));
2555 LEX_STRING
*tabname
;
2556 List
<LEX_STRING
> files
;
2557 char path
[FN_REFLEN
+ 1];
2559 build_table_filename(path
, sizeof(path
) - 1, dbname
, "", "", 0);
2560 if (find_files(thd
, &files
, dbname
, path
, NullS
, 0) != FIND_FILES_OK
)
2562 DBUG_PRINT("info", ("Failed to find files"));
2565 DBUG_PRINT("info",("found: %d files", files
.elements
));
2566 while ((tabname
= files
.pop()))
2568 DBUG_PRINT("info", ("Found table %s", tabname
->str
));
2569 if (ndbcluster_check_if_local_table(dbname
, tabname
->str
))
2577 Common function for setting up everything for logging a table at
2580 int ndbcluster_create_binlog_setup(Ndb
*ndb
, const char *key
,
2583 const char *table_name
,
2584 my_bool share_may_exist
)
2586 int do_event_op
= ndb_binlog_running
;
2587 DBUG_ENTER("ndbcluster_create_binlog_setup");
2588 DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
2589 key
, key_len
, db
, table_name
, share_may_exist
));
2590 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name
));
2591 DBUG_ASSERT(strlen(key
) == key_len
);
2593 pthread_mutex_lock(&ndbcluster_mutex
);
2595 /* Handle any trailing share */
2596 NDB_SHARE
*share
= (NDB_SHARE
*) hash_search(&ndbcluster_open_tables
,
2597 (uchar
*) key
, key_len
);
2599 if (share
&& share_may_exist
)
2601 if (share
->flags
& NSF_NO_BINLOG
||
2605 pthread_mutex_unlock(&ndbcluster_mutex
);
2606 DBUG_RETURN(0); // replication already setup, or should not
2612 if (share
->op
|| share
->op_old
)
2614 my_errno
= HA_ERR_TABLE_EXIST
;
2615 pthread_mutex_unlock(&ndbcluster_mutex
);
2618 if (!share_may_exist
|| share
->connect_count
!=
2619 g_ndb_cluster_connection
->get_connect_count())
2621 handle_trailing_share(share
);
2626 /* Create share which is needed to hold replication information */
2629 /* ndb_share reference create */
2631 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2632 share
->key
, share
->use_count
));
2634 /* ndb_share reference create */
2635 else if (!(share
= get_share(key
, 0, TRUE
, TRUE
)))
2637 sql_print_error("NDB Binlog: "
2638 "allocating table share for %s failed", key
);
2642 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2643 share
->key
, share
->use_count
));
2646 if (!ndb_schema_share
&&
2647 strcmp(share
->db
, NDB_REP_DB
) == 0 &&
2648 strcmp(share
->table_name
, NDB_SCHEMA_TABLE
) == 0)
2650 else if (!ndb_apply_status_share
&&
2651 strcmp(share
->db
, NDB_REP_DB
) == 0 &&
2652 strcmp(share
->table_name
, NDB_APPLY_TABLE
) == 0)
2657 share
->flags
|= NSF_NO_BINLOG
;
2658 pthread_mutex_unlock(&ndbcluster_mutex
);
2661 pthread_mutex_unlock(&ndbcluster_mutex
);
2663 while (share
&& !IS_TMP_PREFIX(table_name
))
2666 ToDo make sanity check of share so that the table is actually the same
2667 I.e. we need to do open file from frm in this case
2668 Currently awaiting this to be fixed in the 4.1 tree in the general
2672 /* Create the event in NDB */
2673 ndb
->setDatabaseName(db
);
2675 NDBDICT
*dict
= ndb
->getDictionary();
2676 Ndb_table_guard
ndbtab_g(dict
, table_name
);
2677 const NDBTAB
*ndbtab
= ndbtab_g
.get_table();
2680 if (ndb_extra_logging
)
2681 sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
2682 "%s, %d", key
, dict
->getNdbError().message
,
2683 dict
->getNdbError().code
);
2686 String
event_name(INJECTOR_EVENT_LEN
);
2687 ndb_rep_event_name(&event_name
, db
, table_name
);
2689 event should have been created by someone else,
2690 but let's make sure, and create if it doesn't exist
2692 const NDBEVENT
*ev
= dict
->getEvent(event_name
.c_ptr());
2695 if (ndbcluster_create_event(ndb
, ndbtab
, event_name
.c_ptr(), share
))
2697 sql_print_error("NDB Binlog: "
2698 "FAILED CREATE (DISCOVER) TABLE Event: %s",
2699 event_name
.c_ptr());
2702 if (ndb_extra_logging
)
2703 sql_print_information("NDB Binlog: "
2704 "CREATE (DISCOVER) TABLE Event: %s",
2705 event_name
.c_ptr());
2710 if (ndb_extra_logging
)
2711 sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
2712 event_name
.c_ptr());
2716 create the event operations for receiving logging events
2718 if (ndbcluster_create_event_ops(share
, ndbtab
, event_name
.c_ptr()))
2720 sql_print_error("NDB Binlog:"
2721 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
2722 event_name
.c_ptr());
2723 /* a warning has been issued to the client */
2732 ndbcluster_create_event(Ndb
*ndb
, const NDBTAB
*ndbtab
,
2733 const char *event_name
, NDB_SHARE
*share
,
2736 THD
*thd
= current_thd
;
2737 DBUG_ENTER("ndbcluster_create_event");
2738 DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
2739 ndbtab
->getName(), ndbtab
->getObjectVersion(),
2740 event_name
, share
? share
->key
: "(nil)"));
2741 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab
->getName()));
2744 DBUG_PRINT("info", ("share == NULL"));
2747 if (share
->flags
& NSF_NO_BINLOG
)
2749 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
2750 share
->flags
, share
->flags
& NSF_NO_BINLOG
));
2754 NDBDICT
*dict
= ndb
->getDictionary();
2755 NDBEVENT
my_event(event_name
);
2756 my_event
.setTable(*ndbtab
);
2757 my_event
.addTableEvent(NDBEVENT::TE_ALL
);
2758 if (share
->flags
& NSF_HIDDEN_PK
)
2760 if (share
->flags
& NSF_BLOB_FLAG
)
2762 sql_print_error("NDB Binlog: logging of table %s "
2763 "with BLOB attribute and no PK is not supported",
2766 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2767 ER_ILLEGAL_HA_CREATE_OPTION
,
2768 ER(ER_ILLEGAL_HA_CREATE_OPTION
),
2769 ndbcluster_hton_name
,
2770 "Binlog of table with BLOB attribute and no PK");
2772 share
->flags
|= NSF_NO_BINLOG
;
2775 /* No primary key, subscribe for all attributes */
2776 my_event
.setReport(NDBEVENT::ER_ALL
);
2777 DBUG_PRINT("info", ("subscription all"));
2781 if (ndb_schema_share
|| strcmp(share
->db
, NDB_REP_DB
) ||
2782 strcmp(share
->table_name
, NDB_SCHEMA_TABLE
))
2784 my_event
.setReport(NDBEVENT::ER_UPDATED
);
2785 DBUG_PRINT("info", ("subscription only updated"));
2789 my_event
.setReport((NDBEVENT::EventReport
)
2790 (NDBEVENT::ER_ALL
| NDBEVENT::ER_SUBSCRIBE
));
2791 DBUG_PRINT("info", ("subscription all and subscribe"));
2794 if (share
->flags
& NSF_BLOB_FLAG
)
2795 my_event
.mergeEvents(TRUE
);
2797 /* add all columns to the event */
2798 int n_cols
= ndbtab
->getNoOfColumns();
2799 for(int a
= 0; a
< n_cols
; a
++)
2800 my_event
.addEventColumn(a
);
2802 if (dict
->createEvent(my_event
)) // Add event to database
2804 if (dict
->getNdbError().classification
!= NdbError::SchemaObjectExists
)
2807 failed, print a warning
2809 if (push_warning
> 1)
2810 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2811 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
2812 dict
->getNdbError().code
,
2813 dict
->getNdbError().message
, "NDB");
2814 sql_print_error("NDB Binlog: Unable to create event in database. "
2815 "Event: %s Error Code: %d Message: %s", event_name
,
2816 dict
->getNdbError().code
, dict
->getNdbError().message
);
2821 try retrieving the event, if table version/id matches, we will get
2822 a valid event. Otherwise we have a trailing event from before
2825 if ((ev
= dict
->getEvent(event_name
)))
2832 trailing event from before; an error, but try to correct it
2834 if (dict
->getNdbError().code
== NDB_INVALID_SCHEMA_OBJECT
&&
2835 dict
->dropEvent(my_event
.getName()))
2837 if (push_warning
> 1)
2838 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2839 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
2840 dict
->getNdbError().code
,
2841 dict
->getNdbError().message
, "NDB");
2842 sql_print_error("NDB Binlog: Unable to create event in database. "
2843 " Attempt to correct with drop failed. "
2844 "Event: %s Error Code: %d Message: %s",
2846 dict
->getNdbError().code
,
2847 dict
->getNdbError().message
);
2852 try to add the event again
2854 if (dict
->createEvent(my_event
))
2856 if (push_warning
> 1)
2857 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2858 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
2859 dict
->getNdbError().code
,
2860 dict
->getNdbError().message
, "NDB");
2861 sql_print_error("NDB Binlog: Unable to create event in database. "
2862 " Attempt to correct with drop ok, but create failed. "
2863 "Event: %s Error Code: %d Message: %s",
2865 dict
->getNdbError().code
,
2866 dict
->getNdbError().message
);
2869 #ifdef NDB_BINLOG_EXTRA_WARNINGS
2870 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2871 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
2872 0, "NDB Binlog: Removed trailing event",
2880 inline int is_ndb_compatible_type(Field
*field
)
2883 !(field
->flags
& BLOB_FLAG
) &&
2884 field
->type() != MYSQL_TYPE_BIT
&&
2885 field
->pack_length() != 0;
2889 - create eventOperations for receiving log events
2890 - setup ndb recattrs for reception of log event data
2891 - "start" the event operation
2893 used at create/discover of tables
2896 ndbcluster_create_event_ops(NDB_SHARE
*share
, const NDBTAB
*ndbtab
,
2897 const char *event_name
)
2899 THD
*thd
= current_thd
;
2901 we are in either create table or rename table so table should be
2902 locked, hence we can work with the share without locks
2905 DBUG_ENTER("ndbcluster_create_event_ops");
2906 DBUG_PRINT("enter", ("table: %s event: %s", ndbtab
->getName(), event_name
));
2907 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab
->getName()));
2909 DBUG_ASSERT(share
!= 0);
2911 if (share
->flags
& NSF_NO_BINLOG
)
2913 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
2918 int do_ndb_schema_share
= 0, do_ndb_apply_status_share
= 0;
2919 if (!ndb_schema_share
&& strcmp(share
->db
, NDB_REP_DB
) == 0 &&
2920 strcmp(share
->table_name
, NDB_SCHEMA_TABLE
) == 0)
2921 do_ndb_schema_share
= 1;
2922 else if (!ndb_apply_status_share
&& strcmp(share
->db
, NDB_REP_DB
) == 0 &&
2923 strcmp(share
->table_name
, NDB_APPLY_TABLE
) == 0)
2924 do_ndb_apply_status_share
= 1;
2925 else if (!binlog_filter
->db_ok(share
->db
) || !ndb_binlog_running
)
2927 share
->flags
|= NSF_NO_BINLOG
;
2933 assert(share
->op
->getCustomData() == (void *) share
);
2935 DBUG_ASSERT(share
->use_count
> 1);
2936 sql_print_error("NDB Binlog: discover reusing old ev op");
2937 /* ndb_share reference ToDo free */
2938 DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
2939 share
->key
, share
->use_count
));
2940 free_share(&share
); // old event op already has reference
2944 TABLE
*table
= share
->table
;
2948 100 milliseconds, temporary error on schema operation can
2949 take some time to be resolved
2951 int retry_sleep
= 100;
2954 pthread_mutex_lock(&injector_mutex
);
2955 Ndb
*ndb
= injector_ndb
;
2956 if (do_ndb_schema_share
)
2961 pthread_mutex_unlock(&injector_mutex
);
2965 NdbEventOperation
* op
;
2966 if (do_ndb_schema_share
)
2967 op
= ndb
->createEventOperation(event_name
);
2970 // set injector_ndb database/schema from table internal name
2971 int ret
= ndb
->setDatabaseAndSchemaName(ndbtab
);
2973 op
= ndb
->createEventOperation(event_name
);
2974 // reset to catch errors
2975 ndb
->setDatabaseName("");
2979 sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
2981 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
2982 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
2983 ndb
->getNdbError().code
,
2984 ndb
->getNdbError().message
,
2986 pthread_mutex_unlock(&injector_mutex
);
2990 if (share
->flags
& NSF_BLOB_FLAG
)
2991 op
->mergeEvents(TRUE
); // currently not inherited from event
2993 DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx share->ndb_value[1]: 0x%lx",
2994 (long) share
->ndb_value
[0],
2995 (long) share
->ndb_value
[1]));
2996 int n_columns
= ndbtab
->getNoOfColumns();
2997 int n_fields
= table
? table
->s
->fields
: 0; // XXX ???
2998 for (int j
= 0; j
< n_columns
; j
++)
3000 const char *col_name
= ndbtab
->getColumn(j
)->getName();
3001 NdbValue attr0
, attr1
;
3004 Field
*f
= share
->table
->field
[j
];
3005 if (is_ndb_compatible_type(f
))
3007 DBUG_PRINT("info", ("%s compatible", col_name
));
3008 attr0
.rec
= op
->getValue(col_name
, (char*) f
->ptr
);
3009 attr1
.rec
= op
->getPreValue(col_name
,
3010 (f
->ptr
- share
->table
->record
[0]) +
3011 (char*) share
->table
->record
[1]);
3013 else if (! (f
->flags
& BLOB_FLAG
))
3015 DBUG_PRINT("info", ("%s non compatible", col_name
));
3016 attr0
.rec
= op
->getValue(col_name
);
3017 attr1
.rec
= op
->getPreValue(col_name
);
3021 DBUG_PRINT("info", ("%s blob", col_name
));
3022 DBUG_ASSERT(share
->flags
& NSF_BLOB_FLAG
);
3023 attr0
.blob
= op
->getBlobHandle(col_name
);
3024 attr1
.blob
= op
->getPreBlobHandle(col_name
);
3025 if (attr0
.blob
== NULL
|| attr1
.blob
== NULL
)
3027 sql_print_error("NDB Binlog: Creating NdbEventOperation"
3028 " blob field %u handles failed (code=%d) for %s",
3029 j
, op
->getNdbError().code
, event_name
);
3030 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
3031 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
3032 op
->getNdbError().code
,
3033 op
->getNdbError().message
,
3035 ndb
->dropEventOperation(op
);
3036 pthread_mutex_unlock(&injector_mutex
);
3043 DBUG_PRINT("info", ("%s hidden key", col_name
));
3044 attr0
.rec
= op
->getValue(col_name
);
3045 attr1
.rec
= op
->getPreValue(col_name
);
3047 share
->ndb_value
[0][j
].ptr
= attr0
.ptr
;
3048 share
->ndb_value
[1][j
].ptr
= attr1
.ptr
;
3049 DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx "
3050 "share->ndb_value[0][%d]: 0x%lx",
3051 j
, (long) &share
->ndb_value
[0][j
],
3052 j
, (long) attr0
.ptr
));
3053 DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx "
3054 "share->ndb_value[1][%d]: 0x%lx",
3055 j
, (long) &share
->ndb_value
[0][j
],
3056 j
, (long) attr1
.ptr
));
3058 op
->setCustomData((void *) share
); // set before execute
3059 share
->op
= op
; // assign op in NDB_SHARE
3064 if (op
->getNdbError().status
!= NdbError::TemporaryError
&&
3065 op
->getNdbError().code
!= 1407)
3069 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
3070 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
3071 op
->getNdbError().code
, op
->getNdbError().message
,
3073 sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
3075 op
->getNdbError().code
, op
->getNdbError().message
);
3077 ndb
->dropEventOperation(op
);
3078 pthread_mutex_unlock(&injector_mutex
);
3081 my_sleep(retry_sleep
);
3086 pthread_mutex_unlock(&injector_mutex
);
3090 /* ndb_share reference binlog */
3092 DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
3093 share
->key
, share
->use_count
));
3094 if (do_ndb_apply_status_share
)
3096 /* ndb_share reference binlog extra */
3097 ndb_apply_status_share
= get_share(share
);
3098 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3099 share
->key
, share
->use_count
));
3100 (void) pthread_cond_signal(&injector_cond
);
3102 else if (do_ndb_schema_share
)
3104 /* ndb_share reference binlog extra */
3105 ndb_schema_share
= get_share(share
);
3106 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3107 share
->key
, share
->use_count
));
3108 (void) pthread_cond_signal(&injector_cond
);
3111 DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
3112 share
->key
, (long) share
->op
, share
->use_count
));
3114 if (ndb_extra_logging
)
3115 sql_print_information("NDB Binlog: logging %s", share
->key
);
3120 when entering the calling thread should have a share lock id share != 0
3121 then the injector thread will have one as well, i.e. share->use_count == 0
3122 (unless it has already dropped... then share->op == 0)
3125 ndbcluster_handle_drop_table(Ndb
*ndb
, const char *event_name
,
3126 NDB_SHARE
*share
, const char *type_str
)
3128 DBUG_ENTER("ndbcluster_handle_drop_table");
3129 THD
*thd
= current_thd
;
3131 NDBDICT
*dict
= ndb
->getDictionary();
3132 if (event_name
&& dict
->dropEvent(event_name
))
3134 if (dict
->getNdbError().code
!= 4710)
3136 /* drop event failed for some reason, issue a warning */
3137 push_warning_printf(thd
, MYSQL_ERROR::WARN_LEVEL_ERROR
,
3138 ER_GET_ERRMSG
, ER(ER_GET_ERRMSG
),
3139 dict
->getNdbError().code
,
3140 dict
->getNdbError().message
, "NDB");
3141 /* error is not that the event did not exist */
3142 sql_print_error("NDB Binlog: Unable to drop event in database. "
3143 "Event: %s Error Code: %d Message: %s",
3145 dict
->getNdbError().code
,
3146 dict
->getNdbError().message
);
3147 /* ToDo; handle error? */
3148 if (share
&& share
->op
&&
3149 share
->op
->getState() == NdbEventOperation::EO_EXECUTING
&&
3150 dict
->getNdbError().mysql_code
!= HA_ERR_NO_CONNECTION
)
3158 if (share
== 0 || share
->op
== 0)
3164 Syncronized drop between client thread and injector thread is
3165 neccessary in order to maintain ordering in the binlog,
3166 such that the drop occurs _after_ any inserts/updates/deletes.
3168 The penalty for this is that the drop table becomes slow.
3170 This wait is however not strictly neccessary to produce a binlog
3171 that is usable. However the slave does not currently handle
3172 these out of order, thus we are keeping the SYNC_DROP_ defined
3175 const char *save_proc_info
= thd
->proc_info
;
3178 thd
->proc_info
= "Syncing ndb table schema operation and binlog";
3179 (void) pthread_mutex_lock(&share
->mutex
);
3180 safe_mutex_assert_owner(&LOCK_open
);
3181 (void) pthread_mutex_unlock(&LOCK_open
);
3182 int max_timeout
= opt_ndb_sync_timeout
;
3185 struct timespec abstime
;
3186 set_timespec(abstime
, 1);
3187 int ret
= pthread_cond_timedwait(&injector_cond
,
3196 if (max_timeout
== 0)
3198 sql_print_error("NDB %s: %s timed out. Ignoring...",
3199 type_str
, share
->key
);
3202 if (ndb_extra_logging
)
3203 ndb_report_waiting(type_str
, max_timeout
,
3204 type_str
, share
->key
);
3207 (void) pthread_mutex_lock(&LOCK_open
);
3208 (void) pthread_mutex_unlock(&share
->mutex
);
3210 (void) pthread_mutex_lock(&share
->mutex
);
3211 share
->op_old
= share
->op
;
3213 (void) pthread_mutex_unlock(&share
->mutex
);
3215 thd
->proc_info
= save_proc_info
;
3221 /********************************************************************
3222 Internal helper functions for differentd events from the stoarage nodes
3223 used by the ndb injector thread
3224 ********************************************************************/
3227 Handle error states on events from the storage nodes
3229 static int ndb_binlog_thread_handle_error(Ndb
*ndb
, NdbEventOperation
*pOp
,
3230 ndb_binlog_index_row
&row
)
3232 NDB_SHARE
*share
= (NDB_SHARE
*)pOp
->getCustomData();
3233 DBUG_ENTER("ndb_binlog_thread_handle_error");
3235 int overrun
= pOp
->isOverrun();
3239 ToDo: this error should rather clear the ndb_binlog_index...
3242 sql_print_error("NDB Binlog: Overrun in event buffer, "
3243 "this means we have dropped events. Cannot "
3244 "continue binlog for %s", share
->key
);
3249 if (!pOp
->isConsistent())
3252 ToDo: this error should rather clear the ndb_binlog_index...
3255 sql_print_error("NDB Binlog: Not Consistent. Cannot "
3256 "continue binlog for %s. Error code: %d"
3257 " Message: %s", share
->key
,
3258 pOp
->getNdbError().code
,
3259 pOp
->getNdbError().message
);
3263 sql_print_error("NDB Binlog: unhandled error %d for table %s",
3264 pOp
->hasError(), share
->key
);
3270 ndb_binlog_thread_handle_non_data_event(THD
*thd
, Ndb
*ndb
,
3271 NdbEventOperation
*pOp
,
3272 ndb_binlog_index_row
&row
)
3274 NDB_SHARE
*share
= (NDB_SHARE
*)pOp
->getCustomData();
3275 NDBEVENT::TableEvent type
= pOp
->getEventType();
3279 case NDBEVENT::TE_CLUSTER_FAILURE
:
3280 if (ndb_extra_logging
)
3281 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
3282 share
->key
, (unsigned) pOp
->getGCI());
3283 if (ndb_apply_status_share
== share
)
3285 if (ndb_extra_logging
&&
3286 ndb_binlog_tables_inited
&& ndb_binlog_running
)
3287 sql_print_information("NDB Binlog: ndb tables initially "
3288 "read only on reconnect.");
3289 /* ndb_share reference binlog extra free */
3290 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3291 share
->key
, share
->use_count
));
3292 free_share(&ndb_apply_status_share
);
3293 ndb_apply_status_share
= 0;
3294 ndb_binlog_tables_inited
= 0;
3296 DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
3297 "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
3299 share
->key
, (long) share
, (long) pOp
,
3300 (long) share
->op
, (long) share
->op_old
));
3302 case NDBEVENT::TE_DROP
:
3303 if (ndb_apply_status_share
== share
)
3305 if (ndb_extra_logging
&&
3306 ndb_binlog_tables_inited
&& ndb_binlog_running
)
3307 sql_print_information("NDB Binlog: ndb tables initially "
3308 "read only on reconnect.");
3309 /* ndb_share reference binlog extra free */
3310 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3311 share
->key
, share
->use_count
));
3312 free_share(&ndb_apply_status_share
);
3313 ndb_apply_status_share
= 0;
3314 ndb_binlog_tables_inited
= 0;
3316 /* ToDo: remove printout */
3317 if (ndb_extra_logging
)
3318 sql_print_information("NDB Binlog: drop table %s.", share
->key
);
3320 case NDBEVENT::TE_ALTER
:
3322 DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
3323 "share op: 0x%lx op_old: 0x%lx",
3324 type
== NDBEVENT::TE_DROP
? "DROP" : "ALTER",
3325 share
->key
, (long) share
, (long) pOp
,
3326 (long) share
->op
, (long) share
->op_old
));
3328 case NDBEVENT::TE_NODE_FAILURE
:
3330 case NDBEVENT::TE_SUBSCRIBE
:
3332 case NDBEVENT::TE_UNSUBSCRIBE
:
3336 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
3337 "Ignoring...", (unsigned) type
, share
->key
);
3341 ndb_handle_schema_change(thd
, ndb
, pOp
, share
);
3346 Handle data events from the storage nodes
3349 ndb_binlog_thread_handle_data_event(Ndb
*ndb
, NdbEventOperation
*pOp
,
3350 ndb_binlog_index_row
&row
,
3351 injector::transaction
&trans
)
3353 NDB_SHARE
*share
= (NDB_SHARE
*) pOp
->getCustomData();
3354 if (share
== ndb_apply_status_share
)
3357 uint32 originating_server_id
= pOp
->getAnyValue();
3358 if (originating_server_id
== 0)
3359 originating_server_id
= ::server_id
;
3360 else if (originating_server_id
& NDB_ANYVALUE_RESERVED
)
3362 if (originating_server_id
!= NDB_ANYVALUE_FOR_NOLOGGING
)
3363 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
3365 originating_server_id
);
3368 else if (!g_ndb_log_slave_updates
)
3371 This event comes from a slave applier since it has an originating
3372 server id set. Since option to log slave updates is not set, skip it.
3377 TABLE
*table
= share
->table
;
3378 DBUG_ASSERT(trans
.good());
3379 DBUG_ASSERT(table
!= 0);
3381 dbug_print_table("table", table
);
3383 TABLE_SHARE
*table_s
= table
->s
;
3384 uint n_fields
= table_s
->fields
;
3386 /* Potential buffer for the bitmap */
3387 uint32 bitbuf
[128 / (sizeof(uint32
) * 8)];
3388 bitmap_init(&b
, n_fields
<= sizeof(bitbuf
) * 8 ? bitbuf
: NULL
,
3393 row data is already in table->record[0]
3394 As we told the NdbEventOperation to do this
3395 (saves moving data about many times)
3399 for now malloc/free blobs buffer each time
3400 TODO if possible share single permanent buffer with handlers
3402 uchar
* blobs_buffer
[2] = { 0, 0 };
3403 uint blobs_buffer_size
[2] = { 0, 0 };
3405 switch(pOp
->getEventType())
3407 case NDBEVENT::TE_INSERT
:
3409 DBUG_PRINT("info", ("INSERT INTO %s.%s",
3410 table_s
->db
.str
, table_s
->table_name
.str
));
3412 if (share
->flags
& NSF_BLOB_FLAG
)
3414 my_ptrdiff_t ptrdiff
= 0;
3415 IF_DBUG(int ret
=) get_ndb_blobs_value(table
, share
->ndb_value
[0],
3417 blobs_buffer_size
[0],
3419 DBUG_ASSERT(ret
== 0);
3421 ndb_unpack_record(table
, share
->ndb_value
[0], &b
, table
->record
[0]);
3422 IF_DBUG(int ret
=) trans
.write_row(originating_server_id
,
3423 injector::transaction::table(table
,
3425 &b
, n_fields
, table
->record
[0]);
3426 DBUG_ASSERT(ret
== 0);
3429 case NDBEVENT::TE_DELETE
:
3431 DBUG_PRINT("info",("DELETE FROM %s.%s",
3432 table_s
->db
.str
, table_s
->table_name
.str
));
3435 table->record[0] contains only the primary key in this case
3436 since we do not have an after image
3439 if (table
->s
->primary_key
!= MAX_KEY
)
3441 use the primary key only as it save time and space and
3442 it is the only thing needed to log the delete
3446 we use the before values since we don't have a primary key
3447 since the mysql server does not handle the hidden primary
3451 if (share
->flags
& NSF_BLOB_FLAG
)
3453 my_ptrdiff_t ptrdiff
= table
->record
[n
] - table
->record
[0];
3454 IF_DBUG(int ret
=) get_ndb_blobs_value(table
, share
->ndb_value
[n
],
3456 blobs_buffer_size
[n
],
3458 DBUG_ASSERT(ret
== 0);
3460 ndb_unpack_record(table
, share
->ndb_value
[n
], &b
, table
->record
[n
]);
3461 DBUG_EXECUTE("info", print_records(table
, table
->record
[n
]););
3462 IF_DBUG(int ret
=) trans
.delete_row(originating_server_id
,
3463 injector::transaction::table(table
,
3465 &b
, n_fields
, table
->record
[n
]);
3466 DBUG_ASSERT(ret
== 0);
3469 case NDBEVENT::TE_UPDATE
:
3471 DBUG_PRINT("info", ("UPDATE %s.%s",
3472 table_s
->db
.str
, table_s
->table_name
.str
));
3474 if (share
->flags
& NSF_BLOB_FLAG
)
3476 my_ptrdiff_t ptrdiff
= 0;
3477 IF_DBUG(int ret
=) get_ndb_blobs_value(table
, share
->ndb_value
[0],
3479 blobs_buffer_size
[0],
3481 DBUG_ASSERT(ret
== 0);
3483 ndb_unpack_record(table
, share
->ndb_value
[0],
3484 &b
, table
->record
[0]);
3485 DBUG_EXECUTE("info", print_records(table
, table
->record
[0]););
3486 if (table
->s
->primary_key
!= MAX_KEY
)
3489 since table has a primary key, we can do a write
3490 using only after values
3492 trans
.write_row(originating_server_id
,
3493 injector::transaction::table(table
, TRUE
),
3494 &b
, n_fields
, table
->record
[0]);// after values
3499 mysql server cannot handle the ndb hidden key and
3500 therefore needs the before image as well
3502 if (share
->flags
& NSF_BLOB_FLAG
)
3504 my_ptrdiff_t ptrdiff
= table
->record
[1] - table
->record
[0];
3505 IF_DBUG(int ret
=) get_ndb_blobs_value(table
, share
->ndb_value
[1],
3507 blobs_buffer_size
[1],
3509 DBUG_ASSERT(ret
== 0);
3511 ndb_unpack_record(table
, share
->ndb_value
[1], &b
, table
->record
[1]);
3512 DBUG_EXECUTE("info", print_records(table
, table
->record
[1]););
3513 IF_DBUG(int ret
=) trans
.update_row(originating_server_id
,
3514 injector::transaction::table(table
,
3517 table
->record
[1], // before values
3518 table
->record
[0]);// after values
3519 DBUG_ASSERT(ret
== 0);
3524 /* We should REALLY never get here. */
3525 DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
3529 if (share
->flags
& NSF_BLOB_FLAG
)
3531 my_free(blobs_buffer
[0], MYF(MY_ALLOW_ZERO_PTR
));
3532 my_free(blobs_buffer
[1], MYF(MY_ALLOW_ZERO_PTR
));
3538 //#define RUN_NDB_BINLOG_TIMER
3539 #ifdef RUN_NDB_BINLOG_TIMER
3543 Timer() { start(); }
3544 void start() { gettimeofday(&m_start
, 0); }
3545 void stop() { gettimeofday(&m_stop
, 0); }
3549 (((longlong
) m_stop
.tv_sec
- (longlong
) m_start
.tv_sec
) * 1000 +
3550 ((longlong
) m_stop
.tv_usec
-
3551 (longlong
) m_start
.tv_usec
+ 999) / 1000);
3554 struct timeval m_start
,m_stop
;
3558 /****************************************************************
3559 Injector thread main loop
3560 ****************************************************************/
3563 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT
*schema_object
,
3565 my_bool not_used
__attribute__((unused
)))
3567 *length
= schema_object
->key_length
;
3568 return (uchar
*) schema_object
->key
;
3571 static NDB_SCHEMA_OBJECT
*ndb_get_schema_object(const char *key
,
3572 my_bool create_if_not_exists
,
3575 NDB_SCHEMA_OBJECT
*ndb_schema_object
;
3576 uint length
= (uint
) strlen(key
);
3577 DBUG_ENTER("ndb_get_schema_object");
3578 DBUG_PRINT("enter", ("key: '%s'", key
));
3581 pthread_mutex_lock(&ndbcluster_mutex
);
3582 while (!(ndb_schema_object
=
3583 (NDB_SCHEMA_OBJECT
*) hash_search(&ndb_schema_objects
,
3587 if (!create_if_not_exists
)
3589 DBUG_PRINT("info", ("does not exist"));
3592 if (!(ndb_schema_object
=
3593 (NDB_SCHEMA_OBJECT
*) my_malloc(sizeof(*ndb_schema_object
) + length
+ 1,
3594 MYF(MY_WME
| MY_ZEROFILL
))))
3596 DBUG_PRINT("info", ("malloc error"));
3599 ndb_schema_object
->key
= (char *)(ndb_schema_object
+1);
3600 memcpy(ndb_schema_object
->key
, key
, length
+ 1);
3601 ndb_schema_object
->key_length
= length
;
3602 if (my_hash_insert(&ndb_schema_objects
, (uchar
*) ndb_schema_object
))
3604 my_free((uchar
*) ndb_schema_object
, 0);
3607 pthread_mutex_init(&ndb_schema_object
->mutex
, MY_MUTEX_INIT_FAST
);
3608 bitmap_init(&ndb_schema_object
->slock_bitmap
, ndb_schema_object
->slock
,
3609 sizeof(ndb_schema_object
->slock
)*8, FALSE
);
3610 bitmap_clear_all(&ndb_schema_object
->slock_bitmap
);
3613 if (ndb_schema_object
)
3615 ndb_schema_object
->use_count
++;
3616 DBUG_PRINT("info", ("use_count: %d", ndb_schema_object
->use_count
));
3619 pthread_mutex_unlock(&ndbcluster_mutex
);
3620 DBUG_RETURN(ndb_schema_object
);
3624 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT
**ndb_schema_object
,
3627 DBUG_ENTER("ndb_free_schema_object");
3628 DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object
)->key
));
3630 pthread_mutex_lock(&ndbcluster_mutex
);
3631 if (!--(*ndb_schema_object
)->use_count
)
3633 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object
)->use_count
));
3634 hash_delete(&ndb_schema_objects
, (uchar
*) *ndb_schema_object
);
3635 pthread_mutex_destroy(&(*ndb_schema_object
)->mutex
);
3636 my_free((uchar
*) *ndb_schema_object
, MYF(0));
3637 *ndb_schema_object
= 0;
3641 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object
)->use_count
));
3644 pthread_mutex_unlock(&ndbcluster_mutex
);
3649 pthread_handler_t
ndb_binlog_thread_func(void *arg
)
3651 THD
*thd
; /* needs to be first for thread_stack */
3655 int ndb_update_ndb_binlog_index
= 1;
3656 injector
*inj
= injector::instance();
3657 uint incident_id
= 0;
3659 #ifdef RUN_NDB_BINLOG_TIMER
3663 pthread_mutex_lock(&injector_mutex
);
3668 DBUG_ENTER("ndb_binlog_thread");
3670 thd
= new THD
; /* note that contructor of THD uses DBUG_ */
3671 THD_CHECK_SENTRY(thd
);
3673 /* We need to set thd->thread_id before thd->store_globals, or it will
3674 set an invalid value for thd->variables.pseudo_thread_id.
3676 pthread_mutex_lock(&LOCK_thread_count
);
3677 thd
->thread_id
= thread_id
++;
3678 pthread_mutex_unlock(&LOCK_thread_count
);
3680 thd
->thread_stack
= (char*) &thd
; /* remember where our stack is */
3681 if (thd
->store_globals())
3685 ndb_binlog_thread_running
= -1;
3686 pthread_mutex_unlock(&injector_mutex
);
3687 pthread_cond_signal(&injector_cond
);
3689 DBUG_LEAVE
; // Must match DBUG_ENTER()
3692 return NULL
; // Avoid compiler warnings
3696 thd
->init_for_queries();
3697 thd
->command
= COM_DAEMON
;
3698 thd
->system_thread
= SYSTEM_THREAD_NDBCLUSTER_BINLOG
;
3699 thd
->version
= refresh_version
;
3700 thd
->main_security_ctx
.host_or_ip
= "";
3701 thd
->client_capabilities
= 0;
3702 my_net_init(&thd
->net
, 0);
3703 thd
->main_security_ctx
.master_access
= ~0;
3704 thd
->main_security_ctx
.priv_user
= 0;
3709 sql_print_information("Starting MySQL Cluster Binlog Thread");
3711 pthread_detach_this_thread();
3712 thd
->real_id
= pthread_self();
3713 pthread_mutex_lock(&LOCK_thread_count
);
3714 threads
.append(thd
);
3715 pthread_mutex_unlock(&LOCK_thread_count
);
3716 thd
->lex
->start_transaction_opt
= 0;
3718 if (!(s_ndb
= new Ndb(g_ndb_cluster_connection
, "")) ||
3721 sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
3722 ndb_binlog_thread_running
= -1;
3723 pthread_mutex_unlock(&injector_mutex
);
3724 pthread_cond_signal(&injector_cond
);
3729 if (!(i_ndb
= new Ndb(g_ndb_cluster_connection
, "")) ||
3732 sql_print_error("NDB Binlog: Getting Ndb object failed");
3733 ndb_binlog_thread_running
= -1;
3734 pthread_mutex_unlock(&injector_mutex
);
3735 pthread_cond_signal(&injector_cond
);
3739 /* init hash for schema object distribution */
3740 (void) hash_init(&ndb_schema_objects
, system_charset_info
, 32, 0, 0,
3741 (hash_get_key
)ndb_schema_objects_get_key
, 0, 0);
3744 Expose global reference to our ndb object.
3746 Used by both sql client thread and binlog thread to interact
3748 pthread_mutex_lock(&injector_mutex);
3751 injector_ndb
= i_ndb
;
3753 injector_ndb
->get_ndb_cluster_connection().get_latest_trans_gci();
3758 ndb_binlog_running
= TRUE
;
3761 /* Thread start up completed */
3762 ndb_binlog_thread_running
= 1;
3763 pthread_mutex_unlock(&injector_mutex
);
3764 pthread_cond_signal(&injector_cond
);
3767 wait for mysql server to start (so that the binlog is started
3768 and thus can receive the first GAP event)
3770 pthread_mutex_lock(&LOCK_server_started
);
3771 while (!mysqld_server_started
)
3773 struct timespec abstime
;
3774 set_timespec(abstime
, 1);
3775 pthread_cond_timedwait(&COND_server_started
, &LOCK_server_started
,
3777 if (ndbcluster_terminating
)
3779 pthread_mutex_unlock(&LOCK_server_started
);
3783 pthread_mutex_unlock(&LOCK_server_started
);
3786 Main NDB Injector loop
3788 while (ndb_binlog_running
)
3791 check if it is the first log, if so we do not insert a GAP event
3792 as there is really no log to have a GAP in
3794 if (incident_id
== 0)
3797 mysql_bin_log
.get_current_log(&log_info
);
3798 int len
= strlen(log_info
.log_file_name
);
3800 if ((sscanf(log_info
.log_file_name
+ len
- 6, "%u", &no
) == 1) &&
3803 /* this is the fist log, so skip GAP event */
3809 Always insert a GAP event as we cannot know what has happened
3810 in the cluster while not being connected.
3812 LEX_STRING
const msg
[2]=
3814 { C_STRING_WITH_LEN("mysqld startup") },
3815 { C_STRING_WITH_LEN("cluster disconnect")}
3818 inj
->record_incident(thd
, INCIDENT_LOST_EVENTS
, msg
[incident_id
]);
3819 DBUG_ASSERT(!error
);
3824 thd
->proc_info
= "Waiting for ndbcluster to start";
3826 pthread_mutex_lock(&injector_mutex
);
3827 while (!ndb_schema_share
||
3828 (ndb_binlog_running
&& !ndb_apply_status_share
))
3830 /* ndb not connected yet */
3831 struct timespec abstime
;
3832 set_timespec(abstime
, 1);
3833 pthread_cond_timedwait(&injector_cond
, &injector_mutex
, &abstime
);
3834 if (ndbcluster_binlog_terminating
)
3836 pthread_mutex_unlock(&injector_mutex
);
3840 pthread_mutex_unlock(&injector_mutex
);
3842 if (thd_ndb
== NULL
)
3844 DBUG_ASSERT(ndbcluster_hton
->slot
!= ~(uint
)0);
3845 if (!(thd_ndb
= ha_ndbcluster::seize_thd_ndb()))
3847 sql_print_error("Could not allocate Thd_ndb object");
3850 set_thd_ndb(thd
, thd_ndb
);
3851 thd_ndb
->options
|= TNO_NO_LOG_SCHEMA_OP
;
3852 thd
->query_id
= 0; // to keep valgrind quiet
3857 // wait for the first event
3858 thd
->proc_info
= "Waiting for first event from ndbcluster";
3859 int schema_res
, res
;
3863 DBUG_PRINT("info", ("Waiting for the first event"));
3865 if (ndbcluster_binlog_terminating
)
3868 schema_res
= s_ndb
->pollEvents(100, &schema_gci
);
3869 } while (schema_gci
== 0 || ndb_latest_received_binlog_epoch
== schema_gci
);
3870 if (ndb_binlog_running
)
3872 Uint64 gci
= i_ndb
->getLatestGCI();
3873 while (gci
< schema_gci
|| gci
== ndb_latest_received_binlog_epoch
)
3875 if (ndbcluster_binlog_terminating
)
3877 res
= i_ndb
->pollEvents(10, &gci
);
3879 if (gci
> schema_gci
)
3884 // now check that we have epochs consistant with what we had before the restart
3885 DBUG_PRINT("info", ("schema_res: %d schema_gci: %lu", schema_res
,
3886 (long) schema_gci
));
3888 i_ndb
->flushIncompleteEvents(schema_gci
);
3889 s_ndb
->flushIncompleteEvents(schema_gci
);
3890 if (schema_gci
< ndb_latest_handled_binlog_epoch
)
3892 sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
3893 "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
3894 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
3895 (unsigned) ndb_latest_handled_binlog_epoch
, (unsigned) schema_gci
);
3896 *p_latest_trans_gci
= 0;
3897 ndb_latest_handled_binlog_epoch
= 0;
3898 ndb_latest_applied_binlog_epoch
= 0;
3899 ndb_latest_received_binlog_epoch
= 0;
3901 else if (ndb_latest_applied_binlog_epoch
> 0)
3903 sql_print_warning("NDB Binlog: cluster has reconnected. "
3904 "Changes to the database that occured while "
3905 "disconnected will not be in the binlog");
3907 if (ndb_extra_logging
)
3909 sql_print_information("NDB Binlog: starting log at epoch %u",
3910 (unsigned)schema_gci
);
3915 static char db
[]= "";
3917 if (ndb_binlog_running
)
3918 open_ndb_binlog_index(thd
, &binlog_tables
, &ndb_binlog_index
);
3921 do_ndbcluster_binlog_close_connection
= BCCC_running
;
3922 for ( ; !((ndbcluster_binlog_terminating
||
3923 do_ndbcluster_binlog_close_connection
) &&
3924 ndb_latest_handled_binlog_epoch
>= *p_latest_trans_gci
) &&
3925 do_ndbcluster_binlog_close_connection
!= BCCC_restart
; )
3928 if (do_ndbcluster_binlog_close_connection
)
3930 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
3931 "ndb_latest_handled_binlog_epoch: %lu, "
3932 "*p_latest_trans_gci: %lu",
3933 do_ndbcluster_binlog_close_connection
,
3934 (ulong
) ndb_latest_handled_binlog_epoch
,
3935 (ulong
) *p_latest_trans_gci
));
3938 #ifdef RUN_NDB_BINLOG_TIMER
3940 sql_print_information("main_timer %ld ms", main_timer
.elapsed_ms());
3945 now we don't want any events before next gci is complete
3947 thd
->proc_info
= "Waiting for event from ndbcluster";
3950 /* wait for event or 1000 ms */
3951 Uint64 gci
= 0, schema_gci
;
3952 int res
= 0, tot_poll_wait
= 1000;
3953 if (ndb_binlog_running
)
3955 res
= i_ndb
->pollEvents(tot_poll_wait
, &gci
);
3961 Just consume any events, not used if no binlogging
3962 e.g. node failure events
3965 if (i_ndb
->pollEvents(0, &tmp_gci
))
3966 while (i_ndb
->nextEvent())
3969 int schema_res
= s_ndb
->pollEvents(tot_poll_wait
, &schema_gci
);
3970 ndb_latest_received_binlog_epoch
= gci
;
3972 while (gci
> schema_gci
&& schema_res
>= 0)
3974 static char buf
[64];
3975 thd
->proc_info
= "Waiting for schema epoch";
3976 my_snprintf(buf
, sizeof(buf
), "%s %u(%u)", thd
->proc_info
, (unsigned) schema_gci
, (unsigned) gci
);
3977 thd
->proc_info
= buf
;
3978 schema_res
= s_ndb
->pollEvents(10, &schema_gci
);
3981 if ((ndbcluster_binlog_terminating
||
3982 do_ndbcluster_binlog_close_connection
) &&
3983 (ndb_latest_handled_binlog_epoch
>= *p_latest_trans_gci
||
3984 !ndb_binlog_running
))
3985 break; /* Shutting down server */
3987 if (ndb_binlog_index
&& ndb_binlog_index
->s
->version
< refresh_version
)
3989 if (ndb_binlog_index
->s
->version
< refresh_version
)
3991 close_thread_tables(thd
);
3992 ndb_binlog_index
= 0;
3996 MEM_ROOT
**root_ptr
=
3997 my_pthread_getspecific_ptr(MEM_ROOT
**, THR_MALLOC
);
3998 MEM_ROOT
*old_root
= *root_ptr
;
4000 init_sql_alloc(&mem_root
, 4096, 0);
4001 List
<Cluster_schema
> post_epoch_log_list
;
4002 List
<Cluster_schema
> post_epoch_unlock_list
;
4003 *root_ptr
= &mem_root
;
4005 if (unlikely(schema_res
> 0))
4007 thd
->proc_info
= "Processing events from schema table";
4009 setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip
);
4011 setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage
);
4012 NdbEventOperation
*pOp
= s_ndb
->nextEvent();
4015 if (!pOp
->hasError())
4017 ndb_binlog_thread_handle_schema_event(thd
, s_ndb
, pOp
,
4018 &post_epoch_log_list
,
4019 &post_epoch_unlock_list
,
4021 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb
->getEventOperation() ?
4022 s_ndb
->getEventOperation()->getEvent()->getTable()->getName() :
4024 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb
->getEventOperation() ?
4025 i_ndb
->getEventOperation()->getEvent()->getTable()->getName() :
4027 if (i_ndb
->getEventOperation() == NULL
&&
4028 s_ndb
->getEventOperation() == NULL
&&
4029 do_ndbcluster_binlog_close_connection
== BCCC_running
)
4031 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4032 do_ndbcluster_binlog_close_connection
= BCCC_restart
;
4033 if (ndb_latest_received_binlog_epoch
< *p_latest_trans_gci
&& ndb_binlog_running
)
4035 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4036 "as latest received epoch is %lu",
4037 (ulong
) *p_latest_trans_gci
,
4038 (ulong
) ndb_latest_received_binlog_epoch
);
4043 sql_print_error("NDB: error %lu (%s) on handling "
4044 "binlog schema event",
4045 (ulong
) pOp
->getNdbError().code
,
4046 pOp
->getNdbError().message
);
4047 pOp
= s_ndb
->nextEvent();
4053 DBUG_PRINT("info", ("pollEvents res: %d", res
));
4054 thd
->proc_info
= "Processing events";
4055 NdbEventOperation
*pOp
= i_ndb
->nextEvent();
4056 ndb_binlog_index_row row
;
4059 #ifdef RUN_NDB_BINLOG_TIMER
4060 Timer gci_timer
, write_timer
;
4065 DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci
));
4066 // sometimes get TE_ALTER with invalid table
4067 DBUG_ASSERT(pOp
->getEventType() == NdbDictionary::Event::TE_ALTER
||
4068 ! IS_NDB_BLOB_PREFIX(pOp
->getEvent()->getTable()->getName()));
4069 DBUG_ASSERT(gci
<= ndb_latest_received_binlog_epoch
);
4071 /* initialize some variables for this epoch */
4072 g_ndb_log_slave_updates
= opt_log_slave_updates
;
4074 setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip
);
4075 i_ndb
->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage
);
4077 bzero((char*) &row
, sizeof(row
));
4078 thd
->variables
.character_set_client
= &my_charset_latin1
;
4079 injector::transaction trans
;
4080 // pass table map before epoch
4083 const NdbEventOperation
*gci_op
;
4085 while ((gci_op
= i_ndb
->getGCIEventOperations(&iter
, &event_types
))
4088 NDB_SHARE
*share
= (NDB_SHARE
*)gci_op
->getCustomData();
4089 DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
4090 (long) gci_op
, (long) share
, event_types
));
4091 // workaround for interface returning TE_STOP events
4092 // which are normally filtered out below in the nextEvent loop
4093 if ((event_types
& ~NdbDictionary::Event::TE_STOP
) == 0)
4095 DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
4096 gci_op
->getEvent()->getTable()->getName()));
4099 // this should not happen
4100 if (share
== NULL
|| share
->table
== NULL
)
4102 DBUG_PRINT("info", ("no share or table %s!",
4103 gci_op
->getEvent()->getTable()->getName()));
4106 if (share
== ndb_apply_status_share
)
4108 // skip this table, it is handled specially
4111 TABLE
*table
= share
->table
;
4113 const LEX_STRING
&name
= table
->s
->table_name
;
4115 if ((event_types
& (NdbDictionary::Event::TE_INSERT
|
4116 NdbDictionary::Event::TE_UPDATE
|
4117 NdbDictionary::Event::TE_DELETE
)) == 0)
4119 DBUG_PRINT("info", ("skipping non data event table: %.*s",
4120 (int) name
.length
, name
.str
));
4126 ("Found new data event, initializing transaction"));
4127 inj
->new_trans(thd
, &trans
);
4129 DBUG_PRINT("info", ("use_table: %.*s",
4130 (int) name
.length
, name
.str
));
4131 injector::transaction::table
tbl(table
, TRUE
);
4132 IF_DBUG(int ret
=) trans
.use_table(::server_id
, tbl
);
4133 DBUG_ASSERT(ret
== 0);
4138 if (ndb_apply_status_share
)
4140 TABLE
*table
= ndb_apply_status_share
->table
;
4143 const LEX_STRING
& name
= table
->s
->table_name
;
4144 DBUG_PRINT("info", ("use_table: %.*s",
4145 (int) name
.length
, name
.str
));
4147 injector::transaction::table
tbl(table
, TRUE
);
4148 IF_DBUG(int ret
=) trans
.use_table(::server_id
, tbl
);
4149 DBUG_ASSERT(ret
== 0);
4152 Intialize table->record[0]
4154 empty_record(table
);
4156 table
->field
[0]->store((longlong
)::server_id
);
4157 table
->field
[1]->store((longlong
)gci
);
4158 table
->field
[2]->store("", 0, &my_charset_bin
);
4159 table
->field
[3]->store((longlong
)0);
4160 table
->field
[4]->store((longlong
)0);
4161 trans
.write_row(::server_id
,
4162 injector::transaction::table(table
, TRUE
),
4163 &table
->s
->all_set
, table
->s
->fields
,
4168 sql_print_error("NDB: Could not get apply status share");
4171 #ifdef RUN_NDB_BINLOG_TIMER
4172 write_timer
.start();
4176 #ifdef RUN_NDB_BINLOG_TIMER
4179 if (pOp
->hasError() &&
4180 ndb_binlog_thread_handle_error(i_ndb
, pOp
, row
) < 0)
4185 NDB_SHARE
*share
= (NDB_SHARE
*) pOp
->getCustomData();
4187 ("EVENT TYPE: %d GCI: %ld last applied: %ld "
4188 "share: 0x%lx (%s.%s)", pOp
->getEventType(),
4190 (long) ndb_latest_applied_binlog_epoch
,
4192 share
? share
->db
: "'NULL'",
4193 share
? share
->table_name
: "'NULL'"));
4194 DBUG_ASSERT(share
!= 0);
4196 // assert that there is consistancy between gci op list
4200 const NdbEventOperation
*gci_op
;
4202 while ((gci_op
= i_ndb
->getGCIEventOperations(&iter
, &event_types
))
4208 DBUG_ASSERT(gci_op
== pOp
);
4209 DBUG_ASSERT((event_types
& pOp
->getEventType()) != 0);
4212 if ((unsigned) pOp
->getEventType() <
4213 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT
)
4214 ndb_binlog_thread_handle_data_event(i_ndb
, pOp
, row
, trans
);
4217 // set injector_ndb database/schema from table internal name
4219 i_ndb
->setDatabaseAndSchemaName(pOp
->getEvent()->getTable());
4220 DBUG_ASSERT(ret
== 0);
4221 ndb_binlog_thread_handle_non_data_event(thd
, i_ndb
, pOp
, row
);
4222 // reset to catch errors
4223 i_ndb
->setDatabaseName("");
4224 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb
->getEventOperation() ?
4225 s_ndb
->getEventOperation()->getEvent()->getTable()->getName() :
4227 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb
->getEventOperation() ?
4228 i_ndb
->getEventOperation()->getEvent()->getTable()->getName() :
4230 if (i_ndb
->getEventOperation() == NULL
&&
4231 s_ndb
->getEventOperation() == NULL
&&
4232 do_ndbcluster_binlog_close_connection
== BCCC_running
)
4234 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4235 do_ndbcluster_binlog_close_connection
= BCCC_restart
;
4236 if (ndb_latest_received_binlog_epoch
< *p_latest_trans_gci
&& ndb_binlog_running
)
4238 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4239 "as latest received epoch is %lu",
4240 (ulong
) *p_latest_trans_gci
,
4241 (ulong
) ndb_latest_received_binlog_epoch
);
4246 pOp
= i_ndb
->nextEvent();
4247 } while (pOp
&& pOp
->getGCI() == gci
);
4250 note! pOp is not referring to an event in the next epoch
4253 #ifdef RUN_NDB_BINLOG_TIMER
4259 //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
4260 thd
->proc_info
= "Committing events to binlog";
4261 injector::transaction::binlog_pos start
= trans
.start_pos();
4262 if (int r
= trans
.commit())
4264 sql_print_error("NDB Binlog: "
4265 "Error during COMMIT of GCI. Error: %d",
4267 /* TODO: Further handling? */
4270 row
.master_log_file
= start
.file_name();
4271 row
.master_log_pos
= start
.file_pos();
4273 DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong
) gci
));
4274 if (ndb_update_ndb_binlog_index
)
4275 ndb_add_ndb_binlog_index(thd
, &row
);
4276 ndb_latest_applied_binlog_epoch
= gci
;
4278 ndb_latest_handled_binlog_epoch
= gci
;
4279 #ifdef RUN_NDB_BINLOG_TIMER
4281 sql_print_information("gci %ld event_count %d write time "
4282 "%ld(%d e/s), total time %ld(%d e/s)",
4283 (ulong
)gci
, event_count
,
4284 write_timer
.elapsed_ms(),
4285 (1000*event_count
) / write_timer
.elapsed_ms(),
4286 gci_timer
.elapsed_ms(),
4287 (1000*event_count
) / gci_timer
.elapsed_ms());
4292 ndb_binlog_thread_handle_schema_event_post_epoch(thd
,
4293 &post_epoch_log_list
,
4294 &post_epoch_unlock_list
);
4295 free_root(&mem_root
, MYF(0));
4296 *root_ptr
= old_root
;
4297 ndb_latest_handled_binlog_epoch
= ndb_latest_received_binlog_epoch
;
4299 if (do_ndbcluster_binlog_close_connection
== BCCC_restart
)
4301 ndb_binlog_tables_inited
= FALSE
;
4302 close_thread_tables(thd
);
4303 ndb_binlog_index
= 0;
4307 sql_print_information("Stopping Cluster Binlog");
4308 DBUG_PRINT("info",("Shutting down cluster binlog thread"));
4309 thd
->proc_info
= "Shutting down";
4310 close_thread_tables(thd
);
4311 pthread_mutex_lock(&injector_mutex
);
4312 /* don't mess with the injector_ndb anymore from other threads */
4315 p_latest_trans_gci
= 0;
4317 pthread_mutex_unlock(&injector_mutex
);
4318 thd
->db
= 0; // as not to try to free memory
4320 if (ndb_apply_status_share
)
4322 /* ndb_share reference binlog extra free */
4323 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4324 ndb_apply_status_share
->key
,
4325 ndb_apply_status_share
->use_count
));
4326 free_share(&ndb_apply_status_share
);
4327 ndb_apply_status_share
= 0;
4329 if (ndb_schema_share
)
4331 /* begin protect ndb_schema_share */
4332 pthread_mutex_lock(&ndb_schema_share_mutex
);
4333 /* ndb_share reference binlog extra free */
4334 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4335 ndb_schema_share
->key
,
4336 ndb_schema_share
->use_count
));
4337 free_share(&ndb_schema_share
);
4338 ndb_schema_share
= 0;
4339 ndb_binlog_tables_inited
= 0;
4340 pthread_mutex_unlock(&ndb_schema_share_mutex
);
4341 /* end protect ndb_schema_share */
4344 /* remove all event operations */
4347 NdbEventOperation
*op
;
4348 DBUG_PRINT("info",("removing all event operations"));
4349 while ((op
= s_ndb
->getEventOperation()))
4351 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op
->getEvent()->getTable()->getName()));
4352 DBUG_PRINT("info",("removing event operation on %s",
4353 op
->getEvent()->getName()));
4354 NDB_SHARE
*share
= (NDB_SHARE
*) op
->getCustomData();
4355 DBUG_ASSERT(share
!= 0);
4356 DBUG_ASSERT(share
->op
== op
||
4357 share
->op_old
== op
);
4358 share
->op
= share
->op_old
= 0;
4359 /* ndb_share reference binlog free */
4360 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4361 share
->key
, share
->use_count
));
4363 s_ndb
->dropEventOperation(op
);
4370 NdbEventOperation
*op
;
4371 DBUG_PRINT("info",("removing all event operations"));
4372 while ((op
= i_ndb
->getEventOperation()))
4374 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op
->getEvent()->getTable()->getName()));
4375 DBUG_PRINT("info",("removing event operation on %s",
4376 op
->getEvent()->getName()));
4377 NDB_SHARE
*share
= (NDB_SHARE
*) op
->getCustomData();
4378 DBUG_ASSERT(share
!= 0);
4379 DBUG_ASSERT(share
->op
== op
||
4380 share
->op_old
== op
);
4381 share
->op
= share
->op_old
= 0;
4382 /* ndb_share reference binlog free */
4383 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4384 share
->key
, share
->use_count
));
4386 i_ndb
->dropEventOperation(op
);
4392 hash_free(&ndb_schema_objects
);
4398 ndb_binlog_thread_running
= -1;
4399 ndb_binlog_running
= FALSE
;
4400 (void) pthread_cond_signal(&injector_cond
);
4402 DBUG_PRINT("exit", ("ndb_binlog_thread"));
4404 DBUG_LEAVE
; // Must match DBUG_ENTER()
4407 return NULL
; // Avoid compiler warnings
4411 ndbcluster_show_status_binlog(THD
* thd
, stat_print_fn
*stat_print
,
4412 enum ha_stat_type stat_type
)
4416 ulonglong ndb_latest_epoch
= 0;
4417 DBUG_ENTER("ndbcluster_show_status_binlog");
4419 pthread_mutex_lock(&injector_mutex
);
4422 char buff1
[22],buff2
[22],buff3
[22],buff4
[22],buff5
[22];
4423 ndb_latest_epoch
= injector_ndb
->getLatestGCI();
4424 pthread_mutex_unlock(&injector_mutex
);
4427 snprintf(buf
, sizeof(buf
),
4429 "latest_trans_epoch=%s, "
4430 "latest_received_binlog_epoch=%s, "
4431 "latest_handled_binlog_epoch=%s, "
4432 "latest_applied_binlog_epoch=%s",
4433 llstr(ndb_latest_epoch
, buff1
),
4434 llstr(*p_latest_trans_gci
, buff2
),
4435 llstr(ndb_latest_received_binlog_epoch
, buff3
),
4436 llstr(ndb_latest_handled_binlog_epoch
, buff4
),
4437 llstr(ndb_latest_applied_binlog_epoch
, buff5
));
4438 if (stat_print(thd
, ndbcluster_hton_name
, ndbcluster_hton_name_length
,
4439 "binlog", strlen("binlog"),
4444 pthread_mutex_unlock(&injector_mutex
);
4448 #endif /* HAVE_NDB_BINLOG */