mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / sql / slave.cc
blobb2bcf37b7613109c148e5696a9c2ee049a1f45f5
1 /*
2 Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 /**
20 @addtogroup Replication
23 @file
25 @brief Code to run the io thread and the sql thread on the
26 replication slave.
29 #include "mysql_priv.h"
31 #include <mysql.h>
32 #include <myisam.h>
33 #include "slave.h"
34 #include "rpl_mi.h"
35 #include "rpl_rli.h"
36 #include "sql_repl.h"
37 #include "rpl_filter.h"
38 #include "repl_failsafe.h"
39 #include <thr_alarm.h>
40 #include <my_dir.h>
41 #include <sql_common.h>
42 #include <errmsg.h>
43 #include <mysqld_error.h>
44 #include <mysys_err.h>
46 #ifdef HAVE_REPLICATION
48 #include "rpl_tblmap.h"
49 #include "debug_sync.h"
51 #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
53 #define MAX_SLAVE_RETRY_PAUSE 5
54 bool use_slave_mask = 0;
55 MY_BITMAP slave_error_mask;
56 char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
58 typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
60 char* slave_load_tmpdir = 0;
61 Master_info *active_mi= 0;
62 my_bool replicate_same_server_id;
63 ulonglong relay_log_space_limit = 0;
66 When slave thread exits, we need to remember the temporary tables so we
67 can re-use them on slave start.
69 TODO: move the vars below under Master_info
72 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
73 int events_till_abort = -1;
75 enum enum_slave_reconnect_actions
77 SLAVE_RECON_ACT_REG= 0,
78 SLAVE_RECON_ACT_DUMP= 1,
79 SLAVE_RECON_ACT_EVENT= 2,
80 SLAVE_RECON_ACT_MAX
83 enum enum_slave_reconnect_messages
85 SLAVE_RECON_MSG_WAIT= 0,
86 SLAVE_RECON_MSG_KILLED_WAITING= 1,
87 SLAVE_RECON_MSG_AFTER= 2,
88 SLAVE_RECON_MSG_FAILED= 3,
89 SLAVE_RECON_MSG_COMMAND= 4,
90 SLAVE_RECON_MSG_KILLED_AFTER= 5,
91 SLAVE_RECON_MSG_MAX
94 static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
97 "Waiting to reconnect after a failed registration on master",
98 "Slave I/O thread killed while waitnig to reconnect after a failed \
99 registration on master",
100 "Reconnecting after a failed registration on master",
101 "failed registering on master, reconnecting to try again, \
102 log '%s' at position %s",
103 "COM_REGISTER_SLAVE",
104 "Slave I/O thread killed during or after reconnect"
107 "Waiting to reconnect after a failed binlog dump request",
108 "Slave I/O thread killed while retrying master dump",
109 "Reconnecting after a failed binlog dump request",
110 "failed dump request, reconnecting to try again, log '%s' at position %s",
111 "COM_BINLOG_DUMP",
112 "Slave I/O thread killed during or after reconnect"
115 "Waiting to reconnect after a failed master event read",
116 "Slave I/O thread killed while waiting to reconnect after a failed read",
117 "Reconnecting after a failed master event read",
118 "Slave I/O thread: Failed reading log event, reconnecting to retry, \
119 log '%s' at position %s",
121 "Slave I/O thread killed during or after a reconnect done to recover from \
122 failed read"
127 typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
129 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
130 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
131 static bool wait_for_relay_log_space(Relay_log_info* rli);
132 static inline bool io_slave_killed(THD* thd,Master_info* mi);
133 static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
134 static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
135 static void print_slave_skip_errors(void);
136 static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
137 static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
138 bool suppress_warnings);
139 static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
140 bool reconnect, bool suppress_warnings);
141 static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
142 void* thread_killed_arg);
143 static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
144 static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
145 const char* table_name, bool overwrite);
146 static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
147 static Log_event* next_event(Relay_log_info* rli);
148 static int queue_event(Master_info* mi,const char* buf,ulong event_len);
149 static int terminate_slave_thread(THD *thd,
150 pthread_mutex_t *term_lock,
151 pthread_cond_t *term_cond,
152 volatile uint *slave_running,
153 bool skip_lock);
154 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
157 Find out which replications threads are running
159 SYNOPSIS
160 init_thread_mask()
161 mask Return value here
162 mi master_info for slave
163 inverse If set, returns which threads are not running
165 IMPLEMENTATION
166 Get a bit mask for which threads are running so that we can later restart
167 these threads.
169 RETURN
170 mask If inverse == 0, running threads
171 If inverse == 1, stopped threads
174 void init_thread_mask(int* mask,Master_info* mi,bool inverse)
176 bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
177 register int tmp_mask=0;
178 DBUG_ENTER("init_thread_mask");
180 if (set_io)
181 tmp_mask |= SLAVE_IO;
182 if (set_sql)
183 tmp_mask |= SLAVE_SQL;
184 if (inverse)
185 tmp_mask^= (SLAVE_IO | SLAVE_SQL);
186 *mask = tmp_mask;
187 DBUG_VOID_RETURN;
192 lock_slave_threads()
195 void lock_slave_threads(Master_info* mi)
197 DBUG_ENTER("lock_slave_threads");
199 //TODO: see if we can do this without dual mutex
200 pthread_mutex_lock(&mi->run_lock);
201 pthread_mutex_lock(&mi->rli.run_lock);
202 DBUG_VOID_RETURN;
207 unlock_slave_threads()
210 void unlock_slave_threads(Master_info* mi)
212 DBUG_ENTER("unlock_slave_threads");
214 //TODO: see if we can do this without dual mutex
215 pthread_mutex_unlock(&mi->rli.run_lock);
216 pthread_mutex_unlock(&mi->run_lock);
217 DBUG_VOID_RETURN;
221 /* Initialize slave structures */
223 int init_slave()
225 DBUG_ENTER("init_slave");
228 This is called when mysqld starts. Before client connections are
229 accepted. However bootstrap may conflict with us if it does START SLAVE.
230 So it's safer to take the lock.
232 pthread_mutex_lock(&LOCK_active_mi);
234 TODO: re-write this to interate through the list of files
235 for multi-master
237 active_mi= new Master_info;
240 If --slave-skip-errors=... was not used, the string value for the
241 system variable has not been set up yet. Do it now.
243 if (!use_slave_mask)
245 print_slave_skip_errors();
249 If master_host is not specified, try to read it from the master_info file.
250 If master_host is specified, create the master_info file if it doesn't
251 exists.
253 if (!active_mi)
255 sql_print_error("Failed to allocate memory for the master info structure");
256 goto err;
259 if (init_master_info(active_mi,master_info_file,relay_log_info_file,
260 !master_host, (SLAVE_IO | SLAVE_SQL)))
262 sql_print_error("Failed to initialize the master info structure");
263 goto err;
266 if (server_id && !master_host && active_mi->host[0])
267 master_host= active_mi->host;
269 /* If server id is not set, start_slave_thread() will say it */
271 if (master_host && !opt_skip_slave_start)
273 if (start_slave_threads(1 /* need mutex */,
274 0 /* no wait for start*/,
275 active_mi,
276 master_info_file,
277 relay_log_info_file,
278 SLAVE_IO | SLAVE_SQL))
280 sql_print_error("Failed to create slave threads");
281 goto err;
284 pthread_mutex_unlock(&LOCK_active_mi);
285 DBUG_RETURN(0);
287 err:
288 pthread_mutex_unlock(&LOCK_active_mi);
289 DBUG_RETURN(1);
294 Convert slave skip errors bitmap into a printable string.
297 static void print_slave_skip_errors(void)
300 To be safe, we want 10 characters of room in the buffer for a number
301 plus terminators. Also, we need some space for constant strings.
302 10 characters must be sufficient for a number plus {',' | '...'}
303 plus a NUL terminator. That is a max 6 digit number.
305 const size_t MIN_ROOM= 10;
306 DBUG_ENTER("print_slave_skip_errors");
307 DBUG_ASSERT(sizeof(slave_skip_error_names) > MIN_ROOM);
308 DBUG_ASSERT(MAX_SLAVE_ERROR <= 999999); // 6 digits
310 if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
312 /* purecov: begin tested */
313 memcpy(slave_skip_error_names, STRING_WITH_LEN("OFF"));
314 /* purecov: end */
316 else if (bitmap_is_set_all(&slave_error_mask))
318 /* purecov: begin tested */
319 memcpy(slave_skip_error_names, STRING_WITH_LEN("ALL"));
320 /* purecov: end */
322 else
324 char *buff= slave_skip_error_names;
325 char *bend= buff + sizeof(slave_skip_error_names);
326 int errnum;
328 for (errnum= 0; errnum < MAX_SLAVE_ERROR; errnum++)
330 if (bitmap_is_set(&slave_error_mask, errnum))
332 if (buff + MIN_ROOM >= bend)
333 break; /* purecov: tested */
334 buff= int10_to_str(errnum, buff, 10);
335 *buff++= ',';
338 if (buff != slave_skip_error_names)
339 buff--; // Remove last ','
340 if (errnum < MAX_SLAVE_ERROR)
342 /* Couldn't show all errors */
343 buff= strmov(buff, "..."); /* purecov: tested */
345 *buff=0;
347 DBUG_PRINT("init", ("error_names: '%s'", slave_skip_error_names));
348 DBUG_VOID_RETURN;
352 Init function to set up array for errors that should be skipped for slave
354 SYNOPSIS
355 init_slave_skip_errors()
356 arg List of errors numbers to skip, separated with ','
358 NOTES
359 Called from get_options() in mysqld.cc on start-up
362 void init_slave_skip_errors(const char* arg)
364 const char *p;
365 DBUG_ENTER("init_slave_skip_errors");
367 if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0))
369 fprintf(stderr, "Badly out of memory, please check your system status\n");
370 exit(1);
372 use_slave_mask = 1;
373 for (;my_isspace(system_charset_info,*arg);++arg)
374 /* empty */;
375 if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
377 bitmap_set_all(&slave_error_mask);
378 print_slave_skip_errors();
379 DBUG_VOID_RETURN;
381 for (p= arg ; *p; )
383 long err_code;
384 if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
385 break;
386 if (err_code < MAX_SLAVE_ERROR)
387 bitmap_set_bit(&slave_error_mask,(uint)err_code);
388 while (!my_isdigit(system_charset_info,*p) && *p)
389 p++;
391 /* Convert slave skip errors bitmap into a printable string. */
392 print_slave_skip_errors();
393 DBUG_VOID_RETURN;
396 static void set_thd_in_use_temporary_tables(Relay_log_info *rli)
398 TABLE *table;
400 for (table= rli->save_temporary_tables ; table ; table= table->next)
401 table->in_use= rli->sql_thd;
404 int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
406 DBUG_ENTER("terminate_slave_threads");
408 if (!mi->inited)
409 DBUG_RETURN(0); /* successfully do nothing */
410 int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
411 pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
413 if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
415 DBUG_PRINT("info",("Terminating SQL thread"));
416 mi->rli.abort_slave=1;
417 if ((error=terminate_slave_thread(mi->rli.sql_thd, sql_lock,
418 &mi->rli.stop_cond,
419 &mi->rli.slave_running,
420 skip_lock)) &&
421 !force_all)
422 DBUG_RETURN(error);
424 if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
426 DBUG_PRINT("info",("Terminating IO thread"));
427 mi->abort_slave=1;
428 if ((error=terminate_slave_thread(mi->io_thd, io_lock,
429 &mi->stop_cond,
430 &mi->slave_running,
431 skip_lock)) &&
432 !force_all)
433 DBUG_RETURN(error);
435 DBUG_RETURN(0);
440 Wait for a slave thread to terminate.
442 This function is called after requesting the thread to terminate
443 (by setting @c abort_slave member of @c Relay_log_info or @c
444 Master_info structure to 1). Termination of the thread is
445 controlled with the the predicate <code>*slave_running</code>.
447 Function will acquire @c term_lock before waiting on the condition
448 unless @c skip_lock is true in which case the mutex should be owned
449 by the caller of this function and will remain acquired after
450 return from the function.
452 @param term_lock
453 Associated lock to use when waiting for @c term_cond
455 @param term_cond
456 Condition that is signalled when the thread has terminated
458 @param slave_running
459 Pointer to predicate to check for slave thread termination
461 @param skip_lock
462 If @c true the lock will not be acquired before waiting on
463 the condition. In this case, it is assumed that the calling
464 function acquires the lock before calling this function.
466 @retval 0 All OK ER_SLAVE_NOT_RUNNING otherwise.
468 @note If the executing thread has to acquire term_lock (skip_lock
469 is false), the negative running status does not represent
470 any issue therefore no error is reported.
473 static int
474 terminate_slave_thread(THD *thd,
475 pthread_mutex_t *term_lock,
476 pthread_cond_t *term_cond,
477 volatile uint *slave_running,
478 bool skip_lock)
480 DBUG_ENTER("terminate_slave_thread");
481 if (!skip_lock)
483 pthread_mutex_lock(term_lock);
485 else
487 safe_mutex_assert_owner(term_lock);
489 if (!*slave_running)
491 if (!skip_lock)
494 if run_lock (term_lock) is acquired locally then either
495 slave_running status is fine
497 pthread_mutex_unlock(term_lock);
498 DBUG_RETURN(0);
500 else
502 DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
505 DBUG_ASSERT(thd != 0);
506 THD_CHECK_SENTRY(thd);
509 Is is critical to test if the slave is running. Otherwise, we might
510 be referening freed memory trying to kick it
513 while (*slave_running) // Should always be true
515 int error;
516 DBUG_PRINT("loop", ("killing slave thread"));
518 pthread_mutex_lock(&thd->LOCK_thd_kill);
519 #ifndef DONT_USE_THR_ALARM
521 Error codes from pthread_kill are:
522 EINVAL: invalid signal number (can't happen)
523 ESRCH: thread already killed (can happen, should be ignored)
525 IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm);
526 DBUG_ASSERT(err != EINVAL);
527 #endif
528 thd->awake(THD::NOT_KILLED);
529 pthread_mutex_unlock(&thd->LOCK_thd_kill);
532 There is a small chance that slave thread might miss the first
533 alarm. To protect againts it, resend the signal until it reacts
535 struct timespec abstime;
536 set_timespec(abstime,2);
537 error= pthread_cond_timedwait(term_cond, term_lock, &abstime);
538 DBUG_ASSERT(error == ETIMEDOUT || error == 0);
541 DBUG_ASSERT(*slave_running == 0);
543 if (!skip_lock)
544 pthread_mutex_unlock(term_lock);
545 DBUG_RETURN(0);
549 int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
550 pthread_mutex_t *cond_lock,
551 pthread_cond_t *start_cond,
552 volatile uint *slave_running,
553 volatile ulong *slave_run_id,
554 Master_info* mi,
555 bool high_priority)
557 pthread_t th;
558 ulong start_id;
559 DBUG_ENTER("start_slave_thread");
561 DBUG_ASSERT(mi->inited);
563 if (start_lock)
564 pthread_mutex_lock(start_lock);
565 if (!server_id)
567 if (start_cond)
568 pthread_cond_broadcast(start_cond);
569 if (start_lock)
570 pthread_mutex_unlock(start_lock);
571 sql_print_error("Server id not set, will not start slave");
572 DBUG_RETURN(ER_BAD_SLAVE);
575 if (*slave_running)
577 if (start_cond)
578 pthread_cond_broadcast(start_cond);
579 if (start_lock)
580 pthread_mutex_unlock(start_lock);
581 DBUG_RETURN(ER_SLAVE_MUST_STOP);
583 start_id= *slave_run_id;
584 DBUG_PRINT("info",("Creating new slave thread"));
585 if (high_priority)
586 my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
587 if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
589 if (start_lock)
590 pthread_mutex_unlock(start_lock);
591 DBUG_RETURN(ER_SLAVE_THREAD);
593 if (start_cond && cond_lock) // caller has cond_lock
595 THD* thd = current_thd;
596 while (start_id == *slave_run_id)
598 DBUG_PRINT("sleep",("Waiting for slave thread to start"));
599 const char* old_msg = thd->enter_cond(start_cond,cond_lock,
600 "Waiting for slave thread to start");
601 pthread_cond_wait(start_cond, cond_lock);
602 thd->exit_cond(old_msg);
603 pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
604 if (thd->killed)
606 if (start_lock)
607 pthread_mutex_unlock(start_lock);
608 DBUG_RETURN(thd->killed_errno());
612 if (start_lock)
613 pthread_mutex_unlock(start_lock);
614 DBUG_RETURN(0);
619 start_slave_threads()
621 NOTES
622 SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
623 sense to do that for starting a slave--we always care if it actually
624 started the threads that were not previously running
627 int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
628 Master_info* mi, const char* master_info_fname,
629 const char* slave_info_fname, int thread_mask)
631 pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
632 pthread_cond_t* cond_io=0,*cond_sql=0;
633 int error=0;
634 DBUG_ENTER("start_slave_threads");
636 if (need_slave_mutex)
638 lock_io = &mi->run_lock;
639 lock_sql = &mi->rli.run_lock;
641 if (wait_for_start)
643 cond_io = &mi->start_cond;
644 cond_sql = &mi->rli.start_cond;
645 lock_cond_io = &mi->run_lock;
646 lock_cond_sql = &mi->rli.run_lock;
649 if (thread_mask & SLAVE_IO)
650 error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
651 cond_io,
652 &mi->slave_running, &mi->slave_run_id,
653 mi, 1); //high priority, to read the most possible
654 if (!error && (thread_mask & SLAVE_SQL))
656 error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
657 cond_sql,
658 &mi->rli.slave_running, &mi->rli.slave_run_id,
659 mi, 0);
660 if (error)
661 terminate_slave_threads(mi, thread_mask & SLAVE_IO, !need_slave_mutex);
663 DBUG_RETURN(error);
667 #ifdef NOT_USED_YET
668 static int end_slave_on_walk(Master_info* mi, uchar* /*unused*/)
670 DBUG_ENTER("end_slave_on_walk");
672 end_master_info(mi);
673 DBUG_RETURN(0);
675 #endif
679 Release slave threads at time of executing shutdown.
681 SYNOPSIS
682 end_slave()
685 void end_slave()
687 DBUG_ENTER("end_slave");
690 This is called when the server terminates, in close_connections().
691 It terminates slave threads. However, some CHANGE MASTER etc may still be
692 running presently. If a START SLAVE was in progress, the mutex lock below
693 will make us wait until slave threads have started, and START SLAVE
694 returns, then we terminate them here.
696 pthread_mutex_lock(&LOCK_active_mi);
697 if (active_mi)
700 TODO: replace the line below with
701 list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
702 once multi-master code is ready.
704 terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
706 pthread_mutex_unlock(&LOCK_active_mi);
707 DBUG_VOID_RETURN;
711 Free all resources used by slave threads at time of executing shutdown.
712 The routine must be called after all possible users of @c active_mi
713 have left.
715 SYNOPSIS
716 close_active_mi()
719 void close_active_mi()
721 pthread_mutex_lock(&LOCK_active_mi);
722 if (active_mi)
724 end_master_info(active_mi);
725 delete active_mi;
726 active_mi= 0;
728 pthread_mutex_unlock(&LOCK_active_mi);
731 static bool io_slave_killed(THD* thd, Master_info* mi)
733 DBUG_ENTER("io_slave_killed");
735 DBUG_ASSERT(mi->io_thd == thd);
736 DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
737 DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
741 static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
743 DBUG_ENTER("sql_slave_killed");
745 DBUG_ASSERT(rli->sql_thd == thd);
746 DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
747 if (abort_loop || thd->killed || rli->abort_slave)
750 The transaction should always be binlogged if OPTION_KEEP_LOG is set
751 (it implies that something can not be rolled back). And such case
752 should be regarded similarly as modifing a non-transactional table
753 because retrying of the transaction will lead to an error or inconsistency
754 as well.
755 Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
757 if (rli->abort_slave && rli->is_in_group() &&
758 (thd->transaction.all.modified_non_trans_table ||
759 (thd->options & OPTION_KEEP_LOG)))
760 DBUG_RETURN(0);
762 If we are in an unsafe situation (stopping could corrupt replication),
763 we give one minute to the slave SQL thread of grace before really
764 terminating, in the hope that it will be able to read more events and
765 the unsafe situation will soon be left. Note that this one minute starts
766 from the last time anything happened in the slave SQL thread. So it's
767 really one minute of idleness, we don't timeout if the slave SQL thread
768 is actively working.
770 if (rli->last_event_start_time == 0)
771 DBUG_RETURN(1);
772 DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
773 "it some grace period"));
774 if (difftime(time(0), rli->last_event_start_time) > 60)
776 rli->report(ERROR_LEVEL, 0,
777 "SQL thread had to stop in an unsafe situation, in "
778 "the middle of applying updates to a "
779 "non-transactional table without any primary key. "
780 "There is a risk of duplicate updates when the slave "
781 "SQL thread is restarted. Please check your tables' "
782 "contents after restart.");
783 DBUG_RETURN(1);
786 DBUG_RETURN(0);
791 skip_load_data_infile()
793 NOTES
794 This is used to tell a 3.23 master to break send_file()
797 void skip_load_data_infile(NET *net)
799 DBUG_ENTER("skip_load_data_infile");
801 (void)net_request_file(net, "/dev/null");
802 (void)my_net_read(net); // discard response
803 (void)net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0); // ok
804 DBUG_VOID_RETURN;
808 bool net_request_file(NET* net, const char* fname)
810 DBUG_ENTER("net_request_file");
811 DBUG_RETURN(net_write_command(net, 251, (uchar*) fname, strlen(fname),
812 (uchar*) "", 0));
816 From other comments and tests in code, it looks like
817 sometimes Query_log_event and Load_log_event can have db == 0
818 (see rewrite_db() above for example)
819 (cases where this happens are unclear; it may be when the master is 3.23).
822 const char *print_slave_db_safe(const char* db)
824 DBUG_ENTER("*print_slave_db_safe");
826 DBUG_RETURN((db ? db : ""));
829 int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
830 const char *default_val)
832 uint length;
833 DBUG_ENTER("init_strvar_from_file");
835 if ((length=my_b_gets(f,var, max_size)))
837 char* last_p = var + length -1;
838 if (*last_p == '\n')
839 *last_p = 0; // if we stopped on newline, kill it
840 else
843 If we truncated a line or stopped on last char, remove all chars
844 up to and including newline.
846 int c;
847 while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)) ;
849 DBUG_RETURN(0);
851 else if (default_val)
853 strmake(var, default_val, max_size-1);
854 DBUG_RETURN(0);
856 DBUG_RETURN(1);
860 int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
862 char buf[32];
863 DBUG_ENTER("init_intvar_from_file");
866 if (my_b_gets(f, buf, sizeof(buf)))
868 *var = atoi(buf);
869 DBUG_RETURN(0);
871 else if (default_val)
873 *var = default_val;
874 DBUG_RETURN(0);
876 DBUG_RETURN(1);
881 Check if the error is caused by network.
882 @param[in] errorno Number of the error.
883 RETURNS:
884 TRUE network error
885 FALSE not network error
888 bool is_network_error(uint errorno)
890 if (errorno == CR_CONNECTION_ERROR ||
891 errorno == CR_CONN_HOST_ERROR ||
892 errorno == CR_SERVER_GONE_ERROR ||
893 errorno == CR_SERVER_LOST ||
894 errorno == ER_CON_COUNT_ERROR ||
895 errorno == ER_SERVER_SHUTDOWN)
896 return TRUE;
898 return FALSE;
903 Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
904 relying on the binlog's version. This is not perfect: imagine an upgrade
905 of the master without waiting that all slaves are in sync with the master;
906 then a slave could be fooled about the binlog's format. This is what happens
907 when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
908 slaves are fooled. So we do this only to distinguish between 3.23 and more
909 recent masters (it's too late to change things for 3.23).
911 RETURNS
912 0 ok
913 1 error
914 2 transient network problem, the caller should try to reconnect
917 static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
919 char err_buff[MAX_SLAVE_ERRMSG];
920 const char* errmsg= 0;
921 int err_code= 0;
922 MYSQL_RES *master_res= 0;
923 MYSQL_ROW master_row;
924 DBUG_ENTER("get_master_version_and_clock");
927 Free old description_event_for_queue (that is needed if we are in
928 a reconnection).
930 delete mi->rli.relay_log.description_event_for_queue;
931 mi->rli.relay_log.description_event_for_queue= 0;
933 if (!my_isdigit(&my_charset_bin,*mysql->server_version))
935 errmsg = "Master reported unrecognized MySQL version";
936 err_code= ER_SLAVE_FATAL_ERROR;
937 sprintf(err_buff, ER(err_code), errmsg);
939 else
942 Note the following switch will bug when we have MySQL branch 30 ;)
944 switch (*mysql->server_version)
946 case '0':
947 case '1':
948 case '2':
949 errmsg = "Master reported unrecognized MySQL version";
950 err_code= ER_SLAVE_FATAL_ERROR;
951 sprintf(err_buff, ER(err_code), errmsg);
952 break;
953 case '3':
954 mi->rli.relay_log.description_event_for_queue= new
955 Format_description_log_event(1, mysql->server_version);
956 break;
957 case '4':
958 mi->rli.relay_log.description_event_for_queue= new
959 Format_description_log_event(3, mysql->server_version);
960 break;
961 default:
963 Master is MySQL >=5.0. Give a default Format_desc event, so that we can
964 take the early steps (like tests for "is this a 3.23 master") which we
965 have to take before we receive the real master's Format_desc which will
966 override this one. Note that the Format_desc we create below is garbage
967 (it has the format of the *slave*); it's only good to help know if the
968 master is 3.23, 4.0, etc.
970 mi->rli.relay_log.description_event_for_queue= new
971 Format_description_log_event(4, mysql->server_version);
972 break;
977 This does not mean that a 5.0 slave will be able to read a 6.0 master; but
978 as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
979 can't read a 6.0 master, this will show up when the slave can't read some
980 events sent by the master, and there will be error messages.
983 if (errmsg)
984 goto err;
986 /* as we are here, we tried to allocate the event */
987 if (!mi->rli.relay_log.description_event_for_queue)
989 errmsg= "default Format_description_log_event";
990 err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
991 sprintf(err_buff, ER(err_code), errmsg);
992 goto err;
996 Compare the master and slave's clock. Do not die if master's clock is
997 unavailable (very old master not supporting UNIX_TIMESTAMP()?).
1000 DBUG_EXECUTE_IF("dbug.before_get_UNIX_TIMESTAMP",
1002 const char act[]=
1003 "now "
1004 "wait_for signal.get_unix_timestamp";
1005 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1006 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1007 STRING_WITH_LEN(act)));
1008 };);
1010 master_res= NULL;
1011 if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT UNIX_TIMESTAMP()")) &&
1012 (master_res= mysql_store_result(mysql)) &&
1013 (master_row= mysql_fetch_row(master_res)))
1015 mi->clock_diff_with_master=
1016 (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
1018 else if (is_network_error(mysql_errno(mysql)))
1020 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1021 "Get master clock failed with error: %s", mysql_error(mysql));
1022 goto network_err;
1024 else
1026 mi->clock_diff_with_master= 0; /* The "most sensible" value */
1027 sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
1028 "do not trust column Seconds_Behind_Master of SHOW "
1029 "SLAVE STATUS. Error: %s (%d)",
1030 mysql_error(mysql), mysql_errno(mysql));
1032 if (master_res)
1034 mysql_free_result(master_res);
1035 master_res= NULL;
1039 Check that the master's server id and ours are different. Because if they
1040 are equal (which can result from a simple copy of master's datadir to slave,
1041 thus copying some my.cnf), replication will work but all events will be
1042 skipped.
1043 Do not die if SHOW VARIABLES LIKE 'SERVER_ID' fails on master (very old
1044 master?).
1045 Note: we could have put a @@SERVER_ID in the previous SELECT
1046 UNIX_TIMESTAMP() instead, but this would not have worked on 3.23 masters.
1048 DBUG_EXECUTE_IF("dbug.before_get_SERVER_ID",
1050 const char act[]=
1051 "now "
1052 "wait_for signal.get_server_id";
1053 DBUG_ASSERT(opt_debug_sync_timeout > 0);
1054 DBUG_ASSERT(!debug_sync_set_action(current_thd,
1055 STRING_WITH_LEN(act)));
1056 };);
1057 master_res= NULL;
1058 master_row= NULL;
1059 if (!mysql_real_query(mysql,
1060 STRING_WITH_LEN("SHOW VARIABLES LIKE 'SERVER_ID'")) &&
1061 (master_res= mysql_store_result(mysql)) &&
1062 (master_row= mysql_fetch_row(master_res)))
1064 if ((::server_id == strtoul(master_row[1], 0, 10)) &&
1065 !mi->rli.replicate_same_server_id)
1067 errmsg= "The slave I/O thread stops because master and slave have equal \
1068 MySQL server ids; these ids must be different for replication to work (or \
1069 the --replicate-same-server-id option must be used on slave but this does \
1070 not always make sense; please check the manual before using it).";
1071 err_code= ER_SLAVE_FATAL_ERROR;
1072 sprintf(err_buff, ER(err_code), errmsg);
1073 goto err;
1076 else if (mysql_errno(mysql))
1078 if (is_network_error(mysql_errno(mysql)))
1080 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1081 "Get master SERVER_ID failed with error: %s", mysql_error(mysql));
1082 goto network_err;
1084 /* Fatal error */
1085 errmsg= "The slave I/O thread stops because a fatal error is encountered \
1086 when it try to get the value of SERVER_ID variable from master.";
1087 err_code= mysql_errno(mysql);
1088 sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
1089 goto err;
1091 else if (!master_row && master_res)
1093 mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
1094 "Unknown system variable 'SERVER_ID' on master, \
1095 maybe it is a *VERY OLD MASTER*.");
1097 if (master_res)
1099 mysql_free_result(master_res);
1100 master_res= NULL;
1104 Check that the master's global character_set_server and ours are the same.
1105 Not fatal if query fails (old master?).
1106 Note that we don't check for equality of global character_set_client and
1107 collation_connection (neither do we prevent their setting in
1108 set_var.cc). That's because from what I (Guilhem) have tested, the global
1109 values of these 2 are never used (new connections don't use them).
1110 We don't test equality of global collation_database either as it's is
1111 going to be deprecated (made read-only) in 4.1 very soon.
1112 The test is only relevant if master < 5.0.3 (we'll test only if it's older
1113 than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
1114 charset info in each binlog event.
1115 We don't do it for 3.23 because masters <3.23.50 hang on
1116 SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
1117 test only if master is 4.x.
1120 /* redundant with rest of code but safer against later additions */
1121 if (*mysql->server_version == '3')
1122 goto err;
1124 if (*mysql->server_version == '4')
1126 master_res= NULL;
1127 if (!mysql_real_query(mysql,
1128 STRING_WITH_LEN("SELECT @@GLOBAL.COLLATION_SERVER")) &&
1129 (master_res= mysql_store_result(mysql)) &&
1130 (master_row= mysql_fetch_row(master_res)))
1132 if (strcmp(master_row[0], global_system_variables.collation_server->name))
1134 errmsg= "The slave I/O thread stops because master and slave have \
1135 different values for the COLLATION_SERVER global variable. The values must \
1136 be equal for the Statement-format replication to work";
1137 err_code= ER_SLAVE_FATAL_ERROR;
1138 sprintf(err_buff, ER(err_code), errmsg);
1139 goto err;
1142 else if (is_network_error(mysql_errno(mysql)))
1144 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1145 "Get master COLLATION_SERVER failed with error: %s", mysql_error(mysql));
1146 goto network_err;
1148 else if (mysql_errno(mysql) != ER_UNKNOWN_SYSTEM_VARIABLE)
1150 /* Fatal error */
1151 errmsg= "The slave I/O thread stops because a fatal error is encountered \
1152 when it try to get the value of COLLATION_SERVER global variable from master.";
1153 err_code= mysql_errno(mysql);
1154 sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
1155 goto err;
1157 else
1158 mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE,
1159 "Unknown system variable 'COLLATION_SERVER' on master, \
1160 maybe it is a *VERY OLD MASTER*. *NOTE*: slave may experience \
1161 inconsistency if replicated data deals with collation.");
1163 if (master_res)
1165 mysql_free_result(master_res);
1166 master_res= NULL;
1171 Perform analogous check for time zone. Theoretically we also should
1172 perform check here to verify that SYSTEM time zones are the same on
1173 slave and master, but we can't rely on value of @@system_time_zone
1174 variable (it is time zone abbreviation) since it determined at start
1175 time and so could differ for slave and master even if they are really
1176 in the same system time zone. So we are omiting this check and just
1177 relying on documentation. Also according to Monty there are many users
1178 who are using replication between servers in various time zones. Hence
1179 such check will broke everything for them. (And now everything will
1180 work for them because by default both their master and slave will have
1181 'SYSTEM' time zone).
1182 This check is only necessary for 4.x masters (and < 5.0.4 masters but
1183 those were alpha).
1185 if (*mysql->server_version == '4')
1187 master_res= NULL;
1188 if (!mysql_real_query(mysql, STRING_WITH_LEN("SELECT @@GLOBAL.TIME_ZONE")) &&
1189 (master_res= mysql_store_result(mysql)) &&
1190 (master_row= mysql_fetch_row(master_res)))
1192 if (strcmp(master_row[0],
1193 global_system_variables.time_zone->get_name()->ptr()))
1195 errmsg= "The slave I/O thread stops because master and slave have \
1196 different values for the TIME_ZONE global variable. The values must \
1197 be equal for the Statement-format replication to work";
1198 err_code= ER_SLAVE_FATAL_ERROR;
1199 sprintf(err_buff, ER(err_code), errmsg);
1200 goto err;
1203 else if (is_network_error(mysql_errno(mysql)))
1205 mi->report(WARNING_LEVEL, mysql_errno(mysql),
1206 "Get master TIME_ZONE failed with error: %s", mysql_error(mysql));
1207 goto network_err;
1209 else
1211 /* Fatal error */
1212 errmsg= "The slave I/O thread stops because a fatal error is encountered \
1213 when it try to get the value of TIME_ZONE global variable from master.";
1214 err_code= mysql_errno(mysql);
1215 sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
1216 goto err;
1218 if (master_res)
1220 mysql_free_result(master_res);
1221 master_res= NULL;
1225 err:
1226 if (errmsg)
1228 if (master_res)
1229 mysql_free_result(master_res);
1230 DBUG_ASSERT(err_code != 0);
1231 mi->report(ERROR_LEVEL, err_code, "%s", err_buff);
1232 DBUG_RETURN(1);
1235 DBUG_RETURN(0);
1237 network_err:
1238 if (master_res)
1239 mysql_free_result(master_res);
1240 DBUG_RETURN(2);
1244 Used by fetch_master_table (used by LOAD TABLE tblname FROM MASTER and LOAD
1245 DATA FROM MASTER). Drops the table (if 'overwrite' is true) and recreates it
1246 from the dump. Honours replication inclusion/exclusion rules.
1247 db must be non-zero (guarded by assertion).
1249 RETURN VALUES
1250 0 success
1251 1 error
1254 static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
1255 const char* table_name, bool overwrite)
1257 ulong packet_len;
1258 char *query, *save_db;
1259 uint32 save_db_length;
1260 Vio* save_vio;
1261 HA_CHECK_OPT check_opt;
1262 TABLE_LIST tables;
1263 int error= 1;
1264 handler *file;
1265 ulonglong save_options;
1266 NET *net= &mysql->net;
1267 const char *found_semicolon= NULL;
1268 DBUG_ENTER("create_table_from_dump");
1270 packet_len= my_net_read(net); // read create table statement
1271 if (packet_len == packet_error)
1273 my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
1274 DBUG_RETURN(1);
1276 if (net->read_pos[0] == 255) // error from master
1278 char *err_msg;
1279 err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
1280 CLIENT_PROTOCOL_41) ?
1281 3+SQLSTATE_LENGTH+1 : 3);
1282 my_error(ER_MASTER, MYF(0), err_msg);
1283 DBUG_RETURN(1);
1285 thd->command = COM_TABLE_DUMP;
1286 if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
1288 sql_print_error("create_table_from_dump: out of memory");
1289 my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
1290 DBUG_RETURN(1);
1292 thd->set_query(query, packet_len);
1293 thd->is_slave_error = 0;
1295 bzero((char*) &tables,sizeof(tables));
1296 tables.db = (char*)db;
1297 tables.alias= tables.table_name= (char*)table_name;
1299 /* Drop the table if 'overwrite' is true */
1300 if (overwrite)
1302 if (mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
1304 sql_print_error("create_table_from_dump: failed to drop the table");
1305 goto err;
1307 else
1309 /* Clear the OK result of mysql_rm_table(). */
1310 thd->main_da.reset_diagnostics_area();
1314 /* Create the table. We do not want to log the "create table" statement */
1315 save_options = thd->options;
1316 thd->options &= ~ (OPTION_BIN_LOG);
1317 thd_proc_info(thd, "Creating table from master dump");
1318 // save old db in case we are creating in a different database
1319 save_db = thd->db;
1320 save_db_length= thd->db_length;
1321 thd->db = (char*)db;
1322 DBUG_ASSERT(thd->db != 0);
1323 thd->db_length= strlen(thd->db);
1324 /* run create table */
1325 mysql_parse(thd, thd->query(), packet_len, &found_semicolon);
1326 thd->db = save_db; // leave things the way the were before
1327 thd->db_length= save_db_length;
1328 thd->options = save_options;
1330 if (thd->is_slave_error)
1331 goto err; // mysql_parse took care of the error send
1333 thd_proc_info(thd, "Opening master dump table");
1334 thd->main_da.reset_diagnostics_area(); /* cleanup from CREATE_TABLE */
1336 Note: If this function starts to fail for MERGE tables,
1337 change the next two lines to these:
1338 tables.table= NULL; // was set by mysql_rm_table()
1339 if (!open_n_lock_single_table(thd, &tables, TL_WRITE))
1341 tables.lock_type = TL_WRITE;
1342 if (!open_ltable(thd, &tables, TL_WRITE, 0))
1344 sql_print_error("create_table_from_dump: could not open created table");
1345 goto err;
1348 file = tables.table->file;
1349 thd_proc_info(thd, "Reading master dump table data");
1350 /* Copy the data file */
1351 if (file->net_read_dump(net))
1353 my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
1354 sql_print_error("create_table_from_dump: failed in\
1355 handler::net_read_dump()");
1356 goto err;
1359 check_opt.init();
1360 check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
1361 thd_proc_info(thd, "Rebuilding the index on master dump table");
1363 We do not want repair() to spam us with messages
1364 just send them to the error log, and report the failure in case of
1365 problems.
1367 save_vio = thd->net.vio;
1368 thd->net.vio = 0;
1369 /* Rebuild the index file from the copied data file (with REPAIR) */
1370 error=file->ha_repair(thd,&check_opt) != 0;
1371 thd->net.vio = save_vio;
1372 if (error)
1373 my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name.str);
1375 err:
1376 close_thread_tables(thd);
1377 DBUG_RETURN(error);
1381 int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
1382 Master_info *mi, MYSQL *mysql, bool overwrite)
1384 int error= 1;
1385 const char *errmsg=0;
1386 bool called_connected= (mysql != NULL);
1387 DBUG_ENTER("fetch_master_table");
1388 DBUG_PRINT("enter", ("db_name: '%s' table_name: '%s'",
1389 db_name,table_name));
1391 if (!called_connected)
1393 if (!(mysql = mysql_init(NULL)))
1395 DBUG_RETURN(1);
1397 if (connect_to_master(thd, mysql, mi))
1399 my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
1401 We need to clear the active VIO since, theoretically, somebody
1402 might issue an awake() on this thread. If we are then in the
1403 middle of closing and destroying the VIO inside the
1404 mysql_close(), we will have a problem.
1406 #ifdef SIGNAL_WITH_VIO_CLOSE
1407 thd->clear_active_vio();
1408 #endif
1409 mysql_close(mysql);
1410 DBUG_RETURN(1);
1412 if (thd->killed)
1413 goto err;
1416 if (request_table_dump(mysql, db_name, table_name))
1418 error= ER_UNKNOWN_ERROR;
1419 errmsg= "Failed on table dump request";
1420 goto err;
1422 if (create_table_from_dump(thd, mysql, db_name,
1423 table_name, overwrite))
1424 goto err; // create_table_from_dump have sent the error already
1425 error = 0;
1427 err:
1428 if (!called_connected)
1429 mysql_close(mysql);
1430 if (errmsg && thd->vio_ok())
1431 my_message(error, errmsg, MYF(0));
1432 DBUG_RETURN(test(error)); // Return 1 on error
1436 static bool wait_for_relay_log_space(Relay_log_info* rli)
1438 bool slave_killed=0;
1439 Master_info* mi = rli->mi;
1440 const char *save_proc_info;
1441 THD* thd = mi->io_thd;
1442 DBUG_ENTER("wait_for_relay_log_space");
1444 pthread_mutex_lock(&rli->log_space_lock);
1445 save_proc_info= thd->enter_cond(&rli->log_space_cond,
1446 &rli->log_space_lock,
1448 Waiting for the slave SQL thread to free enough relay log space");
1449 while (rli->log_space_limit < rli->log_space_total &&
1450 !(slave_killed=io_slave_killed(thd,mi)) &&
1451 !rli->ignore_log_space_limit)
1452 pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
1455 Makes the IO thread read only one event at a time
1456 until the SQL thread is able to purge the relay
1457 logs, freeing some space.
1459 Therefore, once the SQL thread processes this next
1460 event, it goes to sleep (no more events in the queue),
1461 sets ignore_log_space_limit=true and wakes the IO thread.
1462 However, this event may have been enough already for
1463 the SQL thread to purge some log files, freeing
1464 rli->log_space_total .
1466 This guarantees that the SQL and IO thread move
1467 forward only one event at a time (to avoid deadlocks),
1468 when the relay space limit is reached. It also
1469 guarantees that when the SQL thread is prepared to
1470 rotate (to be able to purge some logs), the IO thread
1471 will know about it and will rotate.
1473 NOTE: The ignore_log_space_limit is only set when the SQL
1474 thread sleeps waiting for events.
1477 if (rli->ignore_log_space_limit)
1479 #ifndef DBUG_OFF
1481 char llbuf1[22], llbuf2[22];
1482 DBUG_PRINT("info", ("log_space_limit=%s "
1483 "log_space_total=%s "
1484 "ignore_log_space_limit=%d "
1485 "sql_force_rotate_relay=%d",
1486 llstr(rli->log_space_limit,llbuf1),
1487 llstr(rli->log_space_total,llbuf2),
1488 (int) rli->ignore_log_space_limit,
1489 (int) rli->sql_force_rotate_relay));
1491 #endif
1492 if (rli->sql_force_rotate_relay)
1494 rotate_relay_log(rli->mi);
1495 rli->sql_force_rotate_relay= false;
1498 rli->ignore_log_space_limit= false;
1501 thd->exit_cond(save_proc_info);
1502 DBUG_RETURN(slave_killed);
1507 Builds a Rotate from the ignored events' info and writes it to relay log.
1509 SYNOPSIS
1510 write_ignored_events_info_to_relay_log()
1511 thd pointer to I/O thread's thd
1514 DESCRIPTION
1515 Slave I/O thread, going to die, must leave a durable trace of the
1516 ignored events' end position for the use of the slave SQL thread, by
1517 calling this function. Only that thread can call it (see assertion).
1519 static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
1521 Relay_log_info *rli= &mi->rli;
1522 pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
1523 DBUG_ENTER("write_ignored_events_info_to_relay_log");
1525 DBUG_ASSERT(thd == mi->io_thd);
1526 pthread_mutex_lock(log_lock);
1527 if (rli->ign_master_log_name_end[0])
1529 DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
1530 Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
1531 0, rli->ign_master_log_pos_end,
1532 Rotate_log_event::DUP_NAME);
1533 rli->ign_master_log_name_end[0]= 0;
1534 /* can unlock before writing as slave SQL thd will soon see our Rotate */
1535 pthread_mutex_unlock(log_lock);
1536 if (likely((bool)ev))
1538 ev->server_id= 0; // don't be ignored by slave SQL thread
1539 if (unlikely(rli->relay_log.append(ev)))
1540 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
1541 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
1542 "failed to write a Rotate event"
1543 " to the relay log, SHOW SLAVE STATUS may be"
1544 " inaccurate");
1545 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
1546 if (flush_master_info(mi, TRUE, TRUE))
1547 sql_print_error("Failed to flush master info file");
1548 delete ev;
1550 else
1551 mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
1552 ER(ER_SLAVE_CREATE_EVENT_FAILURE),
1553 "Rotate_event (out of memory?),"
1554 " SHOW SLAVE STATUS may be inaccurate");
1556 else
1557 pthread_mutex_unlock(log_lock);
1558 DBUG_VOID_RETURN;
1562 int register_slave_on_master(MYSQL* mysql, Master_info *mi,
1563 bool *suppress_warnings)
1565 uchar buf[1024], *pos= buf;
1566 uint report_host_len, report_user_len=0, report_password_len=0;
1567 DBUG_ENTER("register_slave_on_master");
1569 *suppress_warnings= FALSE;
1570 if (!report_host)
1571 DBUG_RETURN(0);
1572 report_host_len= strlen(report_host);
1573 if (report_user)
1574 report_user_len= strlen(report_user);
1575 if (report_password)
1576 report_password_len= strlen(report_password);
1577 /* 30 is a good safety margin */
1578 if (report_host_len + report_user_len + report_password_len + 30 >
1579 sizeof(buf))
1580 DBUG_RETURN(0); // safety
1582 int4store(pos, server_id); pos+= 4;
1583 pos= net_store_data(pos, (uchar*) report_host, report_host_len);
1584 pos= net_store_data(pos, (uchar*) report_user, report_user_len);
1585 pos= net_store_data(pos, (uchar*) report_password, report_password_len);
1586 int2store(pos, (uint16) report_port); pos+= 2;
1587 int4store(pos, rpl_recovery_rank); pos+= 4;
1588 /* The master will fill in master_id */
1589 int4store(pos, 0); pos+= 4;
1591 if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (size_t) (pos- buf), 0))
1593 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1595 *suppress_warnings= TRUE; // Suppress reconnect warning
1597 else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
1599 char buf[256];
1600 my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql),
1601 mysql_errno(mysql));
1602 mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
1603 ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf);
1605 DBUG_RETURN(1);
1607 DBUG_RETURN(0);
1612 Execute a SHOW SLAVE STATUS statement.
1614 @param thd Pointer to THD object for the client thread executing the
1615 statement.
1617 @param mi Pointer to Master_info object for the IO thread.
1619 @retval FALSE success
1620 @retval TRUE failure
1622 bool show_master_info(THD* thd, Master_info* mi)
1624 // TODO: fix this for multi-master
1625 List<Item> field_list;
1626 Protocol *protocol= thd->protocol;
1627 DBUG_ENTER("show_master_info");
1629 field_list.push_back(new Item_empty_string("Slave_IO_State",
1630 14));
1631 field_list.push_back(new Item_empty_string("Master_Host",
1632 sizeof(mi->host)));
1633 field_list.push_back(new Item_empty_string("Master_User",
1634 sizeof(mi->user)));
1635 field_list.push_back(new Item_return_int("Master_Port", 7,
1636 MYSQL_TYPE_LONG));
1637 field_list.push_back(new Item_return_int("Connect_Retry", 10,
1638 MYSQL_TYPE_LONG));
1639 field_list.push_back(new Item_empty_string("Master_Log_File",
1640 FN_REFLEN));
1641 field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
1642 MYSQL_TYPE_LONGLONG));
1643 field_list.push_back(new Item_empty_string("Relay_Log_File",
1644 FN_REFLEN));
1645 field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
1646 MYSQL_TYPE_LONGLONG));
1647 field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
1648 FN_REFLEN));
1649 field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
1650 field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
1651 field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
1652 field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
1653 field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
1654 field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
1655 field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
1656 field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
1657 28));
1658 field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
1659 field_list.push_back(new Item_empty_string("Last_Error", 20));
1660 field_list.push_back(new Item_return_int("Skip_Counter", 10,
1661 MYSQL_TYPE_LONG));
1662 field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
1663 MYSQL_TYPE_LONGLONG));
1664 field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
1665 MYSQL_TYPE_LONGLONG));
1666 field_list.push_back(new Item_empty_string("Until_Condition", 6));
1667 field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
1668 field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
1669 MYSQL_TYPE_LONGLONG));
1670 field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
1671 field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
1672 sizeof(mi->ssl_ca)));
1673 field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
1674 sizeof(mi->ssl_capath)));
1675 field_list.push_back(new Item_empty_string("Master_SSL_Cert",
1676 sizeof(mi->ssl_cert)));
1677 field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
1678 sizeof(mi->ssl_cipher)));
1679 field_list.push_back(new Item_empty_string("Master_SSL_Key",
1680 sizeof(mi->ssl_key)));
1681 field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
1682 MYSQL_TYPE_LONGLONG));
1683 field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
1684 3));
1685 field_list.push_back(new Item_return_int("Last_IO_Errno", 4, MYSQL_TYPE_LONG));
1686 field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
1687 field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
1688 field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
1690 if (protocol->send_fields(&field_list,
1691 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1692 DBUG_RETURN(TRUE);
1694 if (mi->host[0])
1696 DBUG_PRINT("info",("host is set: '%s'", mi->host));
1697 String *packet= &thd->packet;
1698 protocol->prepare_for_resend();
1701 slave_running can be accessed without run_lock but not other
1702 non-volotile members like mi->io_thd, which is guarded by the mutex.
1704 pthread_mutex_lock(&mi->run_lock);
1705 protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
1706 pthread_mutex_unlock(&mi->run_lock);
1708 pthread_mutex_lock(&mi->data_lock);
1709 pthread_mutex_lock(&mi->rli.data_lock);
1710 pthread_mutex_lock(&mi->err_lock);
1711 pthread_mutex_lock(&mi->rli.err_lock);
1712 protocol->store(mi->host, &my_charset_bin);
1713 protocol->store(mi->user, &my_charset_bin);
1714 protocol->store((uint32) mi->port);
1715 protocol->store((uint32) mi->connect_retry);
1716 protocol->store(mi->master_log_name, &my_charset_bin);
1717 protocol->store((ulonglong) mi->master_log_pos);
1718 protocol->store(mi->rli.group_relay_log_name +
1719 dirname_length(mi->rli.group_relay_log_name),
1720 &my_charset_bin);
1721 protocol->store((ulonglong) mi->rli.group_relay_log_pos);
1722 protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
1723 protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
1724 "Yes" : "No", &my_charset_bin);
1725 protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
1726 protocol->store(rpl_filter->get_do_db());
1727 protocol->store(rpl_filter->get_ignore_db());
1729 char buf[256];
1730 String tmp(buf, sizeof(buf), &my_charset_bin);
1731 rpl_filter->get_do_table(&tmp);
1732 protocol->store(&tmp);
1733 rpl_filter->get_ignore_table(&tmp);
1734 protocol->store(&tmp);
1735 rpl_filter->get_wild_do_table(&tmp);
1736 protocol->store(&tmp);
1737 rpl_filter->get_wild_ignore_table(&tmp);
1738 protocol->store(&tmp);
1740 protocol->store(mi->rli.last_error().number);
1741 protocol->store(mi->rli.last_error().message, &my_charset_bin);
1742 protocol->store((uint32) mi->rli.slave_skip_counter);
1743 protocol->store((ulonglong) mi->rli.group_master_log_pos);
1744 protocol->store((ulonglong) mi->rli.log_space_total);
1746 protocol->store(
1747 mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
1748 ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
1749 "Relay"), &my_charset_bin);
1750 protocol->store(mi->rli.until_log_name, &my_charset_bin);
1751 protocol->store((ulonglong) mi->rli.until_log_pos);
1753 #ifdef HAVE_OPENSSL
1754 protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
1755 #else
1756 protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin);
1757 #endif
1758 protocol->store(mi->ssl_ca, &my_charset_bin);
1759 protocol->store(mi->ssl_capath, &my_charset_bin);
1760 protocol->store(mi->ssl_cert, &my_charset_bin);
1761 protocol->store(mi->ssl_cipher, &my_charset_bin);
1762 protocol->store(mi->ssl_key, &my_charset_bin);
1765 Seconds_Behind_Master: if SQL thread is running and I/O thread is
1766 connected, we can compute it otherwise show NULL (i.e. unknown).
1768 if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
1769 mi->rli.slave_running)
1771 long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
1772 - mi->clock_diff_with_master);
1774 Apparently on some systems time_diff can be <0. Here are possible
1775 reasons related to MySQL:
1776 - the master is itself a slave of another master whose time is ahead.
1777 - somebody used an explicit SET TIMESTAMP on the master.
1778 Possible reason related to granularity-to-second of time functions
1779 (nothing to do with MySQL), which can explain a value of -1:
1780 assume the master's and slave's time are perfectly synchronized, and
1781 that at slave's connection time, when the master's timestamp is read,
1782 it is at the very end of second 1, and (a very short time later) when
1783 the slave's timestamp is read it is at the very beginning of second
1784 2. Then the recorded value for master is 1 and the recorded value for
1785 slave is 2. At SHOW SLAVE STATUS time, assume that the difference
1786 between timestamp of slave and rli->last_master_timestamp is 0
1787 (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
1788 This confuses users, so we don't go below 0: hence the max().
1790 last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
1791 special marker to say "consider we have caught up".
1793 protocol->store((longlong)(mi->rli.last_master_timestamp ?
1794 max(0, time_diff) : 0));
1796 else
1798 protocol->store_null();
1800 protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin);
1802 // Last_IO_Errno
1803 protocol->store(mi->last_error().number);
1804 // Last_IO_Error
1805 protocol->store(mi->last_error().message, &my_charset_bin);
1806 // Last_SQL_Errno
1807 protocol->store(mi->rli.last_error().number);
1808 // Last_SQL_Error
1809 protocol->store(mi->rli.last_error().message, &my_charset_bin);
1811 pthread_mutex_unlock(&mi->rli.err_lock);
1812 pthread_mutex_unlock(&mi->err_lock);
1813 pthread_mutex_unlock(&mi->rli.data_lock);
1814 pthread_mutex_unlock(&mi->data_lock);
1816 if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
1817 DBUG_RETURN(TRUE);
1819 my_eof(thd);
1820 DBUG_RETURN(FALSE);
1824 void set_slave_thread_options(THD* thd)
1826 DBUG_ENTER("set_slave_thread_options");
1828 It's nonsense to constrain the slave threads with max_join_size; if a
1829 query succeeded on master, we HAVE to execute it. So set
1830 OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
1831 (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
1832 SELECT examining more than 4 billion rows would still fail (yes, because
1833 when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
1834 only for client threads.
1836 ulonglong options= thd->options | OPTION_BIG_SELECTS;
1837 if (opt_log_slave_updates)
1838 options|= OPTION_BIN_LOG;
1839 else
1840 options&= ~OPTION_BIN_LOG;
1841 thd->options= options;
1842 thd->variables.completion_type= 0;
1843 DBUG_VOID_RETURN;
1846 void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
1848 DBUG_ENTER("set_slave_thread_default_charset");
1850 thd->variables.character_set_client=
1851 global_system_variables.character_set_client;
1852 thd->variables.collation_connection=
1853 global_system_variables.collation_connection;
1854 thd->variables.collation_server=
1855 global_system_variables.collation_server;
1856 thd->update_charset();
1859 We use a const cast here since the conceptual (and externally
1860 visible) behavior of the function is to set the default charset of
1861 the thread. That the cache has to be invalidated is a secondary
1862 effect.
1864 const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
1865 DBUG_VOID_RETURN;
1869 init_slave_thread()
1872 static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
1874 DBUG_ENTER("init_slave_thread");
1875 #if !defined(DBUG_OFF)
1876 int simulate_error= 0;
1877 #endif
1878 thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
1879 SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
1880 thd->security_ctx->skip_grants();
1881 my_net_init(&thd->net, 0);
1883 Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
1884 slave threads, since a replication event can become this much larger
1885 than the corresponding packet (query) sent from client to master.
1887 thd->variables.max_allowed_packet= slave_max_allowed_packet;
1888 thd->slave_thread = 1;
1889 thd->enable_slow_log= opt_log_slow_slave_statements;
1890 set_slave_thread_options(thd);
1891 thd->client_capabilities = CLIENT_LOCAL_FILES;
1892 pthread_mutex_lock(&LOCK_thread_count);
1893 thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1894 pthread_mutex_unlock(&LOCK_thread_count);
1896 DBUG_EXECUTE_IF("simulate_io_slave_error_on_init",
1897 simulate_error|= (1 << SLAVE_THD_IO););
1898 DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init",
1899 simulate_error|= (1 << SLAVE_THD_SQL););
1900 #if !defined(DBUG_OFF)
1901 if (init_thr_lock() || thd->store_globals() || simulate_error & (1<< thd_type))
1902 #else
1903 if (init_thr_lock() || thd->store_globals())
1904 #endif
1906 thd->cleanup();
1907 DBUG_RETURN(-1);
1909 lex_start(thd);
1911 if (thd_type == SLAVE_THD_SQL)
1912 thd_proc_info(thd, "Waiting for the next event in relay log");
1913 else
1914 thd_proc_info(thd, "Waiting for master update");
1915 thd->version=refresh_version;
1916 thd->set_time();
1917 DBUG_RETURN(0);
1921 static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
1922 void* thread_killed_arg)
1924 int nap_time;
1925 thr_alarm_t alarmed;
1926 DBUG_ENTER("safe_sleep");
1928 thr_alarm_init(&alarmed);
1929 time_t start_time= my_time(0);
1930 time_t end_time= start_time+sec;
1932 while ((nap_time= (int) (end_time - start_time)) > 0)
1934 ALARM alarm_buff;
1936 The only reason we are asking for alarm is so that
1937 we will be woken up in case of murder, so if we do not get killed,
1938 set the alarm so it goes off after we wake up naturally
1940 thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
1941 sleep(nap_time);
1942 thr_end_alarm(&alarmed);
1944 if ((*thread_killed)(thd,thread_killed_arg))
1945 DBUG_RETURN(1);
1946 start_time= my_time(0);
1948 DBUG_RETURN(0);
1952 static int request_dump(MYSQL* mysql, Master_info* mi,
1953 bool *suppress_warnings)
1955 uchar buf[FN_REFLEN + 10];
1956 int len;
1957 int binlog_flags = 0; // for now
1958 char* logname = mi->master_log_name;
1959 DBUG_ENTER("request_dump");
1961 *suppress_warnings= FALSE;
1963 // TODO if big log files: Change next to int8store()
1964 int4store(buf, (ulong) mi->master_log_pos);
1965 int2store(buf + 4, binlog_flags);
1966 int4store(buf + 6, server_id);
1967 len = (uint) strlen(logname);
1968 memcpy(buf + 10, logname,len);
1969 if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
1972 Something went wrong, so we will just reconnect and retry later
1973 in the future, we should do a better error analysis, but for
1974 now we just fill up the error log :-)
1976 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
1977 *suppress_warnings= TRUE; // Suppress reconnect warning
1978 else
1979 sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs",
1980 mysql_errno(mysql), mysql_error(mysql),
1981 master_connect_retry);
1982 DBUG_RETURN(1);
1985 DBUG_RETURN(0);
1989 static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
1991 uchar buf[1024], *p = buf;
1992 DBUG_ENTER("request_table_dump");
1994 uint table_len = (uint) strlen(table);
1995 uint db_len = (uint) strlen(db);
1996 if (table_len + db_len > sizeof(buf) - 2)
1998 sql_print_error("request_table_dump: Buffer overrun");
1999 DBUG_RETURN(1);
2002 *p++ = db_len;
2003 memcpy(p, db, db_len);
2004 p += db_len;
2005 *p++ = table_len;
2006 memcpy(p, table, table_len);
2008 if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
2010 sql_print_error("request_table_dump: Error sending the table dump \
2011 command");
2012 DBUG_RETURN(1);
2015 DBUG_RETURN(0);
2020 Read one event from the master
2022 SYNOPSIS
2023 read_event()
2024 mysql MySQL connection
2025 mi Master connection information
2026 suppress_warnings TRUE when a normal net read timeout has caused us to
2027 try a reconnect. We do not want to print anything to
2028 the error log in this case because this a anormal
2029 event in an idle server.
2031 RETURN VALUES
2032 'packet_error' Error
2033 number Length of packet
2036 static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
2038 ulong len;
2039 DBUG_ENTER("read_event");
2041 *suppress_warnings= FALSE;
2043 my_real_read() will time us out
2044 We check if we were told to die, and if not, try reading again
2046 #ifndef DBUG_OFF
2047 if (disconnect_slave_event_count && !(mi->events_till_disconnect--))
2048 DBUG_RETURN(packet_error);
2049 #endif
2051 len = cli_safe_read(mysql);
2052 if (len == packet_error || (long) len < 1)
2054 if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
2057 We are trying a normal reconnect after a read timeout;
2058 we suppress prints to .err file as long as the reconnect
2059 happens without problems
2061 *suppress_warnings= TRUE;
2063 else
2064 sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
2065 mysql_error(mysql), mysql_errno(mysql));
2066 DBUG_RETURN(packet_error);
2069 /* Check if eof packet */
2070 if (len < 8 && mysql->net.read_pos[0] == 254)
2072 sql_print_information("Slave: received end packet from server, apparent "
2073 "master shutdown: %s",
2074 mysql_error(mysql));
2075 DBUG_RETURN(packet_error);
2078 DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d",
2079 len, mysql->net.read_pos[4]));
2080 DBUG_RETURN(len - 1);
2084 Check if the current error is of temporary nature of not.
2085 Some errors are temporary in nature, such as
2086 ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
2087 that the error is temporary by pushing a warning with the error code
2088 ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
2090 static int has_temporary_error(THD *thd)
2092 DBUG_ENTER("has_temporary_error");
2094 DBUG_EXECUTE_IF("all_errors_are_temporary_errors",
2095 if (thd->main_da.is_error())
2097 thd->clear_error();
2098 my_error(ER_LOCK_DEADLOCK, MYF(0));
2102 If there is no message in THD, we can't say if it's a temporary
2103 error or not. This is currently the case for Incident_log_event,
2104 which sets no message. Return FALSE.
2106 if (!thd->is_error())
2107 DBUG_RETURN(0);
2110 Temporary error codes:
2111 currently, InnoDB deadlock detected by InnoDB or lock
2112 wait timeout (innodb_lock_wait_timeout exceeded
2114 if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK ||
2115 thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT)
2116 DBUG_RETURN(1);
2118 #ifdef HAVE_NDB_BINLOG
2120 currently temporary error set in ndbcluster
2122 List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
2123 MYSQL_ERROR *err;
2124 while ((err= it++))
2126 DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg));
2127 switch (err->code)
2129 case ER_GET_TEMPORARY_ERRMSG:
2130 DBUG_RETURN(1);
2131 default:
2132 break;
2135 #endif
2136 DBUG_RETURN(0);
2141 Applies the given event and advances the relay log position.
2143 In essence, this function does:
2145 @code
2146 ev->apply_event(rli);
2147 ev->update_pos(rli);
2148 @endcode
2150 But it also does some maintainance, such as skipping events if
2151 needed and reporting errors.
2153 If the @c skip flag is set, then it is tested whether the event
2154 should be skipped, by looking at the slave_skip_counter and the
2155 server id. The skip flag should be set when calling this from a
2156 replication thread but not set when executing an explicit BINLOG
2157 statement.
2159 @retval 0 OK.
2161 @retval 1 Error calling ev->apply_event().
2163 @retval 2 No error calling ev->apply_event(), but error calling
2164 ev->update_pos().
2166 int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
2168 int exec_res= 0;
2170 DBUG_ENTER("apply_event_and_update_pos");
2172 DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
2173 ev->get_type_str(), ev->get_type_code(),
2174 ev->server_id));
2175 DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
2176 FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
2177 FLAGSTR(thd->options, OPTION_BEGIN),
2178 (ulong) rli->last_event_start_time));
2181 Execute the event to change the database and update the binary
2182 log coordinates, but first we set some data that is needed for
2183 the thread.
2185 The event will be executed unless it is supposed to be skipped.
2187 Queries originating from this server must be skipped. Low-level
2188 events (Format_description_log_event, Rotate_log_event,
2189 Stop_log_event) from this server must also be skipped. But for
2190 those we don't want to modify 'group_master_log_pos', because
2191 these events did not exist on the master.
2192 Format_description_log_event is not completely skipped.
2194 Skip queries specified by the user in 'slave_skip_counter'. We
2195 can't however skip events that has something to do with the log
2196 files themselves.
2198 Filtering on own server id is extremely important, to ignore
2199 execution of events created by the creation/rotation of the relay
2200 log (remember that now the relay log starts with its Format_desc,
2201 has a Rotate etc).
2204 thd->server_id = ev->server_id; // use the original server id for logging
2205 thd->set_time(); // time the query
2206 thd->lex->current_select= 0;
2207 if (!ev->when)
2208 ev->when= my_time(0);
2209 ev->thd = thd; // because up to this point, ev->thd == 0
2211 int reason= ev->shall_skip(rli);
2212 if (reason == Log_event::EVENT_SKIP_COUNT)
2213 --rli->slave_skip_counter;
2214 pthread_mutex_unlock(&rli->data_lock);
2215 if (reason == Log_event::EVENT_SKIP_NOT)
2216 exec_res= ev->apply_event(rli);
2218 #ifndef DBUG_OFF
2220 This only prints information to the debug trace.
2222 TODO: Print an informational message to the error log?
2224 static const char *const explain[] = {
2225 // EVENT_SKIP_NOT,
2226 "not skipped",
2227 // EVENT_SKIP_IGNORE,
2228 "skipped because event should be ignored",
2229 // EVENT_SKIP_COUNT
2230 "skipped because event skip counter was non-zero"
2232 DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
2233 thd->options & OPTION_BEGIN ? 1 : 0,
2234 rli->get_flag(Relay_log_info::IN_STMT)));
2235 DBUG_PRINT("skip_event", ("%s event was %s",
2236 ev->get_type_str(), explain[reason]));
2237 #endif
2239 DBUG_PRINT("info", ("apply_event error = %d", exec_res));
2240 if (exec_res == 0)
2242 int error= ev->update_pos(rli);
2243 #ifdef HAVE_purify
2244 if (!rli->is_fake)
2245 #endif
2247 #ifndef DBUG_OFF
2248 char buf[22];
2249 #endif
2250 DBUG_PRINT("info", ("update_pos error = %d", error));
2251 DBUG_PRINT("info", ("group %s %s",
2252 llstr(rli->group_relay_log_pos, buf),
2253 rli->group_relay_log_name));
2254 DBUG_PRINT("info", ("event %s %s",
2255 llstr(rli->event_relay_log_pos, buf),
2256 rli->event_relay_log_name));
2259 The update should not fail, so print an error message and
2260 return an error code.
2262 TODO: Replace this with a decent error message when merged
2263 with BUG#24954 (which adds several new error message).
2265 if (error)
2267 char buf[22];
2268 rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
2269 "It was not possible to update the positions"
2270 " of the relay log information: the slave may"
2271 " be in an inconsistent state."
2272 " Stopped in %s position %s",
2273 rli->group_relay_log_name,
2274 llstr(rli->group_relay_log_pos, buf));
2275 DBUG_RETURN(2);
2279 DBUG_RETURN(exec_res ? 1 : 0);
2284 Top-level function for executing the next event from the relay log.
2286 This function reads the event from the relay log, executes it, and
2287 advances the relay log position. It also handles errors, etc.
2289 This function may fail to apply the event for the following reasons:
2291 - The position specfied by the UNTIL condition of the START SLAVE
2292 command is reached.
2294 - It was not possible to read the event from the log.
2296 - The slave is killed.
2298 - An error occurred when applying the event, and the event has been
2299 tried slave_trans_retries times. If the event has been retried
2300 fewer times, 0 is returned.
2302 - init_master_info or init_relay_log_pos failed. (These are called
2303 if a failure occurs when applying the event.)
2305 - An error occurred when updating the binlog position.
2307 @retval 0 The event was applied.
2309 @retval 1 The event was not applied.
2311 static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
2313 DBUG_ENTER("exec_relay_log_event");
2316 We acquire this mutex since we need it for all operations except
2317 event execution. But we will release it in places where we will
2318 wait for something for example inside of next_event().
2320 pthread_mutex_lock(&rli->data_lock);
2322 Log_event * ev = next_event(rli);
2324 DBUG_ASSERT(rli->sql_thd==thd);
2326 if (sql_slave_killed(thd,rli))
2328 pthread_mutex_unlock(&rli->data_lock);
2329 delete ev;
2330 DBUG_RETURN(1);
2332 if (ev)
2334 int exec_res;
2337 This tests if the position of the beginning of the current event
2338 hits the UNTIL barrier.
2340 if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
2341 rli->is_until_satisfied(thd, ev))
2343 char buf[22];
2344 sql_print_information("Slave SQL thread stopped because it reached its"
2345 " UNTIL position %s", llstr(rli->until_pos(), buf));
2347 Setting abort_slave flag because we do not want additional message about
2348 error in query execution to be printed.
2350 rli->abort_slave= 1;
2351 pthread_mutex_unlock(&rli->data_lock);
2352 delete ev;
2353 DBUG_RETURN(1);
2355 exec_res= apply_event_and_update_pos(ev, thd, rli);
2358 Format_description_log_event should not be deleted because it will be
2359 used to read info about the relay log's format; it will be deleted when
2360 the SQL thread does not need it, i.e. when this thread terminates.
2362 if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
2363 !rli->is_deferred_event(ev))
2365 DBUG_PRINT("info", ("Deleting the event after it has been executed"));
2366 delete ev;
2370 update_log_pos failed: this should not happen, so we don't
2371 retry.
2373 if (exec_res == 2)
2374 DBUG_RETURN(1);
2376 if (slave_trans_retries)
2378 int UNINIT_VAR(temp_err);
2379 if (exec_res && (temp_err= has_temporary_error(thd)))
2381 const char *errmsg;
2383 We were in a transaction which has been rolled back because of a
2384 temporary error;
2385 let's seek back to BEGIN log event and retry it all again.
2386 Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
2387 there is no rollback since 5.0.13 (ref: manual).
2388 We have to not only seek but also
2389 a) init_master_info(), to seek back to hot relay log's start for later
2390 (for when we will come back to this hot log after re-processing the
2391 possibly existing old logs where BEGIN is: check_binlog_magic() will
2392 then need the cache to be at position 0 (see comments at beginning of
2393 init_master_info()).
2394 b) init_relay_log_pos(), because the BEGIN may be an older relay log.
2396 if (rli->trans_retries < slave_trans_retries)
2398 if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
2399 sql_print_error("Failed to initialize the master info structure");
2400 else if (init_relay_log_pos(rli,
2401 rli->group_relay_log_name,
2402 rli->group_relay_log_pos,
2403 1, &errmsg, 1))
2404 sql_print_error("Error initializing relay log position: %s",
2405 errmsg);
2406 else
2408 exec_res= 0;
2409 rli->cleanup_context(thd, 1);
2410 /* chance for concurrent connection to get more locks */
2411 safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
2412 (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
2413 pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
2414 rli->trans_retries++;
2415 rli->retried_trans++;
2416 pthread_mutex_unlock(&rli->data_lock);
2417 DBUG_PRINT("info", ("Slave retries transaction "
2418 "rli->trans_retries: %lu", rli->trans_retries));
2421 else
2422 sql_print_error("Slave SQL thread retried transaction %lu time(s) "
2423 "in vain, giving up. Consider raising the value of "
2424 "the slave_transaction_retries variable.",
2425 slave_trans_retries);
2427 else if ((exec_res && !temp_err) ||
2428 (opt_using_transactions &&
2429 rli->group_relay_log_pos == rli->event_relay_log_pos))
2432 Only reset the retry counter if the entire group succeeded
2433 or failed with a non-transient error. On a successful
2434 event, the execution will proceed as usual; in the case of a
2435 non-transient error, the slave will stop with an error.
2437 rli->trans_retries= 0; // restart from fresh
2438 DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
2439 rli->trans_retries));
2442 DBUG_RETURN(exec_res);
2444 pthread_mutex_unlock(&rli->data_lock);
2445 rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE,
2446 ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
2447 Could not parse relay log event entry. The possible reasons are: the master's \
2448 binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
2449 binary log), the slave's relay log is corrupted (you can check this by running \
2450 'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \
2451 or slave's MySQL code. If you want to check the master's binary log or slave's \
2452 relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \
2453 on this slave.\
2455 DBUG_RETURN(1);
2459 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
2461 if (io_slave_killed(thd, mi))
2463 if (info && global_system_variables.log_warnings)
2464 sql_print_information("%s", info);
2465 return TRUE;
2467 return FALSE;
2472 @brief Try to reconnect slave IO thread.
2474 @details Terminates current connection to master, sleeps for
2475 @c mi->connect_retry msecs and initiates new connection with
2476 @c safe_reconnect(). Variable pointed by @c retry_count is increased -
2477 if it exceeds @c master_retry_count then connection is not re-established
2478 and function signals error.
2479 Unless @c suppres_warnings is TRUE, a warning is put in the server error log
2480 when reconnecting. The warning message and messages used to report errors
2481 are taken from @c messages array. In case @c master_retry_count is exceeded,
2482 no messages are added to the log.
2484 @param[in] thd Thread context.
2485 @param[in] mysql MySQL connection.
2486 @param[in] mi Master connection information.
2487 @param[in,out] retry_count Number of attempts to reconnect.
2488 @param[in] suppress_warnings TRUE when a normal net read timeout
2489 has caused to reconnecting.
2490 @param[in] messages Messages to print/log, see
2491 reconnect_messages[] array.
2493 @retval 0 OK.
2494 @retval 1 There was an error.
2497 static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
2498 uint *retry_count, bool suppress_warnings,
2499 const char *messages[SLAVE_RECON_MSG_MAX])
2501 mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
2502 thd->proc_info= messages[SLAVE_RECON_MSG_WAIT];
2503 #ifdef SIGNAL_WITH_VIO_CLOSE
2504 thd->clear_active_vio();
2505 #endif
2506 end_server(mysql);
2507 if ((*retry_count)++)
2509 if (*retry_count > master_retry_count)
2510 return 1; // Don't retry forever
2511 safe_sleep(thd, mi->connect_retry, (CHECK_KILLED_FUNC) io_slave_killed,
2512 (void *) mi);
2514 if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
2515 return 1;
2516 thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
2517 if (!suppress_warnings)
2519 char buf[256], llbuff[22];
2520 my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED],
2521 IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff));
2523 Raise a warining during registering on master/requesting dump.
2524 Log a message reading event.
2526 if (messages[SLAVE_RECON_MSG_COMMAND][0])
2528 mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE,
2529 ER(ER_SLAVE_MASTER_COM_FAILURE),
2530 messages[SLAVE_RECON_MSG_COMMAND], buf);
2532 else
2534 sql_print_information("%s", buf);
2537 if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
2539 if (global_system_variables.log_warnings)
2540 sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]);
2541 return 1;
2543 return 0;
2548 Slave IO thread entry point.
2550 @param arg Pointer to Master_info struct that holds information for
2551 the IO thread.
2553 @return Always 0.
2555 pthread_handler_t handle_slave_io(void *arg)
2557 THD *thd; // needs to be first for thread_stack
2558 MYSQL *mysql;
2559 Master_info *mi = (Master_info*)arg;
2560 Relay_log_info *rli= &mi->rli;
2561 char llbuff[22];
2562 uint retry_count;
2563 bool suppress_warnings;
2564 int ret;
2565 #ifndef DBUG_OFF
2566 uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
2567 #endif
2568 // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
2569 my_thread_init();
2570 DBUG_ENTER("handle_slave_io");
2572 DBUG_ASSERT(mi->inited);
2573 mysql= NULL ;
2574 retry_count= 0;
2576 pthread_mutex_lock(&mi->run_lock);
2577 /* Inform waiting threads that slave has started */
2578 mi->slave_run_id++;
2580 #ifndef DBUG_OFF
2581 mi->events_till_disconnect = disconnect_slave_event_count;
2582 #endif
2584 thd= new THD; // note that contructor of THD uses DBUG_ !
2585 THD_CHECK_SENTRY(thd);
2586 DBUG_ASSERT(mi->io_thd == 0);
2587 mi->io_thd = thd;
2589 pthread_detach_this_thread();
2590 thd->thread_stack= (char*) &thd; // remember where our stack is
2591 mi->clear_error();
2592 if (init_slave_thread(thd, SLAVE_THD_IO))
2594 pthread_cond_broadcast(&mi->start_cond);
2595 pthread_mutex_unlock(&mi->run_lock);
2596 sql_print_error("Failed during slave I/O thread initialization");
2597 goto err;
2599 pthread_mutex_lock(&LOCK_thread_count);
2600 threads.append(thd);
2601 pthread_mutex_unlock(&LOCK_thread_count);
2602 mi->slave_running = 1;
2603 mi->abort_slave = 0;
2604 pthread_mutex_unlock(&mi->run_lock);
2605 pthread_cond_broadcast(&mi->start_cond);
2607 DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
2608 mi->master_log_name,
2609 llstr(mi->master_log_pos,llbuff)));
2611 if (!(mi->mysql = mysql = mysql_init(NULL)))
2613 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2614 ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
2615 goto err;
2618 thd_proc_info(thd, "Connecting to master");
2619 // we can get killed during safe_connect
2620 if (!safe_connect(thd, mysql, mi))
2622 sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
2623 "replication started in log '%s' at position %s",
2624 mi->user, mi->host, mi->port,
2625 IO_RPL_LOG_NAME,
2626 llstr(mi->master_log_pos,llbuff));
2628 Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
2629 thread, since a replication event can become this much larger than
2630 the corresponding packet (query) sent from client to master.
2632 thd->net.max_packet_size= slave_max_allowed_packet;
2633 mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
2635 else
2637 sql_print_information("Slave I/O thread killed while connecting to master");
2638 goto err;
2641 connected:
2643 DBUG_EXECUTE_IF("dbug.before_get_running_status_yes",
2645 const char act[]=
2646 "now "
2647 "wait_for signal.io_thread_let_running";
2648 DBUG_ASSERT(opt_debug_sync_timeout > 0);
2649 DBUG_ASSERT(!debug_sync_set_action(thd,
2650 STRING_WITH_LEN(act)));
2651 };);
2653 // TODO: the assignment below should be under mutex (5.0)
2654 mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
2655 thd->slave_net = &mysql->net;
2656 thd_proc_info(thd, "Checking master version");
2657 ret= get_master_version_and_clock(mysql, mi);
2658 if (ret == 1)
2659 /* Fatal error */
2660 goto err;
2662 if (ret == 2)
2664 if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed"
2665 "while calling get_master_version_and_clock(...)"))
2666 goto err;
2667 suppress_warnings= FALSE;
2668 /* Try to reconnect because the error was caused by a transient network problem */
2669 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2670 reconnect_messages[SLAVE_RECON_ACT_REG]))
2671 goto err;
2672 goto connected;
2675 if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
2678 Register ourselves with the master.
2680 thd_proc_info(thd, "Registering slave on master");
2681 if (register_slave_on_master(mysql, mi, &suppress_warnings))
2683 if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
2684 "while registering slave on master"))
2686 sql_print_error("Slave I/O thread couldn't register on master");
2687 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2688 reconnect_messages[SLAVE_RECON_ACT_REG]))
2689 goto err;
2691 else
2692 goto err;
2693 goto connected;
2695 DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG",
2696 if (!retry_count_reg)
2698 retry_count_reg++;
2699 sql_print_information("Forcing to reconnect slave I/O thread");
2700 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2701 reconnect_messages[SLAVE_RECON_ACT_REG]))
2702 goto err;
2703 goto connected;
2707 DBUG_PRINT("info",("Starting reading binary log from master"));
2708 while (!io_slave_killed(thd,mi))
2710 thd_proc_info(thd, "Requesting binlog dump");
2711 if (request_dump(mysql, mi, &suppress_warnings))
2713 sql_print_error("Failed on request_dump()");
2714 if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2715 requesting master dump") ||
2716 try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2717 reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2718 goto err;
2719 goto connected;
2721 DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP",
2722 if (!retry_count_dump)
2724 retry_count_dump++;
2725 sql_print_information("Forcing to reconnect slave I/O thread");
2726 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2727 reconnect_messages[SLAVE_RECON_ACT_DUMP]))
2728 goto err;
2729 goto connected;
2732 DBUG_ASSERT(mi->last_error().number == 0);
2733 while (!io_slave_killed(thd,mi))
2735 ulong event_len;
2737 We say "waiting" because read_event() will wait if there's nothing to
2738 read. But if there's something to read, it will not wait. The
2739 important thing is to not confuse users by saying "reading" whereas
2740 we're in fact receiving nothing.
2742 thd_proc_info(thd, "Waiting for master to send event");
2743 event_len= read_event(mysql, mi, &suppress_warnings);
2744 if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
2745 reading event"))
2746 goto err;
2747 DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
2748 if (!retry_count_event)
2750 retry_count_event++;
2751 sql_print_information("Forcing to reconnect slave I/O thread");
2752 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2753 reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2754 goto err;
2755 goto connected;
2758 if (event_len == packet_error)
2760 uint mysql_error_number= mysql_errno(mysql);
2761 switch (mysql_error_number) {
2762 case CR_NET_PACKET_TOO_LARGE:
2763 sql_print_error("\
2764 Log entry on master is longer than slave_max_allowed_packet (%lu) on \
2765 slave. If the entry is correct, restart the server with a higher value of \
2766 slave_max_allowed_packet",
2767 slave_max_allowed_packet);
2768 mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE,
2769 "%s", "Got a packet bigger than 'slave_max_allowed_packet' bytes");
2770 goto err;
2771 case ER_MASTER_FATAL_ERROR_READING_BINLOG:
2772 mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG,
2773 ER(ER_MASTER_FATAL_ERROR_READING_BINLOG),
2774 mysql_error_number, mysql_error(mysql));
2775 goto err;
2776 case ER_OUT_OF_RESOURCES:
2777 sql_print_error("\
2778 Stopping slave I/O thread due to out-of-memory error from master");
2779 mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES,
2780 "%s", ER(ER_OUT_OF_RESOURCES));
2781 goto err;
2783 if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
2784 reconnect_messages[SLAVE_RECON_ACT_EVENT]))
2785 goto err;
2786 goto connected;
2787 } // if (event_len == packet_error)
2789 retry_count=0; // ok event, reset retry counter
2790 thd_proc_info(thd, "Queueing master event to the relay log");
2791 if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
2792 event_len))
2794 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
2795 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
2796 "could not queue event from master");
2797 goto err;
2799 if (flush_master_info(mi, TRUE, TRUE))
2801 sql_print_error("Failed to flush master info file");
2802 goto err;
2805 See if the relay logs take too much space.
2806 We don't lock mi->rli.log_space_lock here; this dirty read saves time
2807 and does not introduce any problem:
2808 - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
2809 the clean value is 0), then we are reading only one more event as we
2810 should, and we'll block only at the next event. No big deal.
2811 - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
2812 the clean value is 1), then we are going into wait_for_relay_log_space()
2813 for no reason, but this function will do a clean read, notice the clean
2814 value and exit immediately.
2816 #ifndef DBUG_OFF
2818 char llbuf1[22], llbuf2[22];
2819 DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
2820 ignore_log_space_limit=%d",
2821 llstr(rli->log_space_limit,llbuf1),
2822 llstr(rli->log_space_total,llbuf2),
2823 (int) rli->ignore_log_space_limit));
2825 #endif
2827 if (rli->log_space_limit && rli->log_space_limit <
2828 rli->log_space_total &&
2829 !rli->ignore_log_space_limit)
2830 if (wait_for_relay_log_space(rli))
2832 sql_print_error("Slave I/O thread aborted while waiting for relay \
2833 log space");
2834 goto err;
2839 // error = 0;
2840 err:
2841 // print the current replication position
2842 sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
2843 IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
2844 thd->set_query(NULL, 0);
2845 thd->reset_db(NULL, 0);
2846 if (mysql)
2849 Here we need to clear the active VIO before closing the
2850 connection with the master. The reason is that THD::awake()
2851 might be called from terminate_slave_thread() because somebody
2852 issued a STOP SLAVE. If that happends, the close_active_vio()
2853 can be called in the middle of closing the VIO associated with
2854 the 'mysql' object, causing a crash.
2856 #ifdef SIGNAL_WITH_VIO_CLOSE
2857 thd->clear_active_vio();
2858 #endif
2859 mysql_close(mysql);
2860 mi->mysql=0;
2862 write_ignored_events_info_to_relay_log(thd, mi);
2863 thd_proc_info(thd, "Waiting for slave mutex on exit");
2864 pthread_mutex_lock(&mi->run_lock);
2866 /* Forget the relay log's format */
2867 delete mi->rli.relay_log.description_event_for_queue;
2868 mi->rli.relay_log.description_event_for_queue= 0;
2869 // TODO: make rpl_status part of Master_info
2870 change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
2871 DBUG_ASSERT(thd->net.buff != 0);
2872 net_end(&thd->net); // destructor will not free it, because net.vio is 0
2873 close_thread_tables(thd);
2874 pthread_mutex_lock(&LOCK_thread_count);
2875 THD_CHECK_SENTRY(thd);
2876 delete thd;
2877 pthread_mutex_unlock(&LOCK_thread_count);
2878 mi->abort_slave= 0;
2879 mi->slave_running= 0;
2880 mi->io_thd= 0;
2882 Note: the order of the two following calls (first broadcast, then unlock)
2883 is important. Otherwise a killer_thread can execute between the calls and
2884 delete the mi structure leading to a crash! (see BUG#25306 for details)
2886 pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
2887 DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
2888 pthread_mutex_unlock(&mi->run_lock);
2890 DBUG_LEAVE; // Must match DBUG_ENTER()
2891 my_thread_end();
2892 pthread_exit(0);
2893 return 0; // Avoid compiler warnings
2897 Check the temporary directory used by commands like
2898 LOAD DATA INFILE.
2900 static
2901 int check_temp_dir(char* tmp_file)
2903 int fd;
2904 MY_DIR *dirp;
2905 char tmp_dir[FN_REFLEN];
2906 size_t tmp_dir_size;
2908 DBUG_ENTER("check_temp_dir");
2911 Get the directory from the temporary file.
2913 dirname_part(tmp_dir, tmp_file, &tmp_dir_size);
2916 Check if the directory exists.
2918 if (!(dirp=my_dir(tmp_dir,MYF(MY_WME))))
2919 DBUG_RETURN(1);
2920 my_dirend(dirp);
2923 Check permissions to create a file.
2925 if ((fd= my_create(tmp_file, CREATE_MODE,
2926 O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
2927 MYF(MY_WME))) < 0)
2928 DBUG_RETURN(1);
2931 Clean up.
2933 my_close(fd, MYF(0));
2934 my_delete(tmp_file, MYF(0));
2936 DBUG_RETURN(0);
2940 Slave SQL thread entry point.
2942 @param arg Pointer to Relay_log_info object that holds information
2943 for the SQL thread.
2945 @return Always 0.
2947 pthread_handler_t handle_slave_sql(void *arg)
2949 THD *thd; /* needs to be first for thread_stack */
2950 char llbuff[22],llbuff1[22];
2951 char saved_log_name[FN_REFLEN];
2952 char saved_master_log_name[FN_REFLEN];
2953 my_off_t UNINIT_VAR(saved_log_pos);
2954 my_off_t UNINIT_VAR(saved_master_log_pos);
2955 my_off_t saved_skip= 0;
2957 Relay_log_info* rli = &((Master_info*)arg)->rli;
2958 const char *errmsg;
2960 // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
2961 my_thread_init();
2962 DBUG_ENTER("handle_slave_sql");
2964 DBUG_ASSERT(rli->inited);
2965 pthread_mutex_lock(&rli->run_lock);
2966 DBUG_ASSERT(!rli->slave_running);
2967 errmsg= 0;
2968 #ifndef DBUG_OFF
2969 rli->events_till_abort = abort_slave_event_count;
2970 #endif
2972 thd = new THD; // note that contructor of THD uses DBUG_ !
2973 thd->thread_stack = (char*)&thd; // remember where our stack is
2974 rli->sql_thd= thd;
2976 /* Inform waiting threads that slave has started */
2977 rli->slave_run_id++;
2978 rli->slave_running = 1;
2980 pthread_detach_this_thread();
2981 if (init_slave_thread(thd, SLAVE_THD_SQL))
2984 TODO: this is currently broken - slave start and change master
2985 will be stuck if we fail here
2987 pthread_cond_broadcast(&rli->start_cond);
2988 pthread_mutex_unlock(&rli->run_lock);
2989 rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
2990 "Failed during slave thread initialization");
2991 goto err;
2993 thd->init_for_queries();
2994 thd->rli_slave= rli;
2995 if ((rli->deferred_events_collecting= rpl_filter->is_on()))
2997 rli->deferred_events= new Deferred_log_events(rli);
3000 thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
3001 set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
3002 pthread_mutex_lock(&LOCK_thread_count);
3003 threads.append(thd);
3004 pthread_mutex_unlock(&LOCK_thread_count);
3006 We are going to set slave_running to 1. Assuming slave I/O thread is
3007 alive and connected, this is going to make Seconds_Behind_Master be 0
3008 i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
3009 the moment we start we can think we are caught up, and the next second we
3010 start receiving data so we realize we are not caught up and
3011 Seconds_Behind_Master grows. No big deal.
3013 rli->abort_slave = 0;
3014 pthread_mutex_unlock(&rli->run_lock);
3015 pthread_cond_broadcast(&rli->start_cond);
3018 Reset errors for a clean start (otherwise, if the master is idle, the SQL
3019 thread may execute no Query_log_event, so the error will remain even
3020 though there's no problem anymore). Do not reset the master timestamp
3021 (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
3022 as we are not sure that we are going to receive a query, we want to
3023 remember the last master timestamp (to say how many seconds behind we are
3024 now.
3025 But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
3027 rli->clear_error();
3029 //tell the I/O thread to take relay_log_space_limit into account from now on
3030 pthread_mutex_lock(&rli->log_space_lock);
3031 rli->ignore_log_space_limit= 0;
3032 pthread_mutex_unlock(&rli->log_space_lock);
3033 rli->trans_retries= 0; // start from "no error"
3034 DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
3036 if (init_relay_log_pos(rli,
3037 rli->group_relay_log_name,
3038 rli->group_relay_log_pos,
3039 1 /*need data lock*/, &errmsg,
3040 1 /*look for a description_event*/))
3042 rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3043 "Error initializing relay log position: %s", errmsg);
3044 goto err;
3046 THD_CHECK_SENTRY(thd);
3047 #ifndef DBUG_OFF
3049 char llbuf1[22], llbuf2[22];
3050 DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
3051 llstr(my_b_tell(rli->cur_log),llbuf1),
3052 llstr(rli->event_relay_log_pos,llbuf2)));
3053 DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
3055 Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
3056 correct position when it's called just after my_b_seek() (the questionable
3057 stuff is those "seek is done on next read" comments in the my_b_seek()
3058 source code).
3059 The crude reality is that this assertion randomly fails whereas
3060 replication seems to work fine. And there is no easy explanation why it
3061 fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
3062 init_relay_log_pos() called above). Maybe the assertion would be
3063 meaningful if we held rli->data_lock between the my_b_seek() and the
3064 DBUG_ASSERT().
3066 #ifdef SHOULD_BE_CHECKED
3067 DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
3068 #endif
3070 #endif
3071 DBUG_ASSERT(rli->sql_thd == thd);
3073 DBUG_PRINT("master_info",("log_file_name: %s position: %s",
3074 rli->group_master_log_name,
3075 llstr(rli->group_master_log_pos,llbuff)));
3076 if (global_system_variables.log_warnings)
3077 sql_print_information("Slave SQL thread initialized, starting replication in \
3078 log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
3079 llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
3080 llstr(rli->group_relay_log_pos,llbuff1));
3082 if (check_temp_dir(rli->slave_patternload_file))
3084 rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
3085 "Unable to use slave's temporary directory %s - %s",
3086 slave_load_tmpdir, thd->main_da.message());
3087 goto err;
3090 /* execute init_slave variable */
3091 if (sys_init_slave.value_length)
3093 execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
3094 if (thd->is_slave_error)
3096 rli->report(ERROR_LEVEL, thd->main_da.sql_errno(),
3097 "Slave SQL thread aborted. Can't execute init_slave query");
3098 goto err;
3103 First check until condition - probably there is nothing to execute. We
3104 do not want to wait for next event in this case.
3106 pthread_mutex_lock(&rli->data_lock);
3107 if (rli->slave_skip_counter)
3109 strmake(saved_log_name, rli->group_relay_log_name, FN_REFLEN - 1);
3110 strmake(saved_master_log_name, rli->group_master_log_name, FN_REFLEN - 1);
3111 saved_log_pos= rli->group_relay_log_pos;
3112 saved_master_log_pos= rli->group_master_log_pos;
3113 saved_skip= rli->slave_skip_counter;
3115 if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
3116 rli->is_until_satisfied(thd, NULL))
3118 char buf[22];
3119 sql_print_information("Slave SQL thread stopped because it reached its"
3120 " UNTIL position %s", llstr(rli->until_pos(), buf));
3121 pthread_mutex_unlock(&rli->data_lock);
3122 goto err;
3124 pthread_mutex_unlock(&rli->data_lock);
3126 /* Read queries from the IO/THREAD until this thread is killed */
3128 while (!sql_slave_killed(thd,rli))
3130 thd_proc_info(thd, "Reading event from the relay log");
3131 DBUG_ASSERT(rli->sql_thd == thd);
3132 THD_CHECK_SENTRY(thd);
3134 if (saved_skip && rli->slave_skip_counter == 0)
3136 sql_print_information("'SQL_SLAVE_SKIP_COUNTER=%ld' executed at "
3137 "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
3138 "master_log_pos='%ld' and new position at "
3139 "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', "
3140 "master_log_pos='%ld' ",
3141 (ulong) saved_skip, saved_log_name, (ulong) saved_log_pos,
3142 saved_master_log_name, (ulong) saved_master_log_pos,
3143 rli->group_relay_log_name, (ulong) rli->group_relay_log_pos,
3144 rli->group_master_log_name, (ulong) rli->group_master_log_pos);
3145 saved_skip= 0;
3148 if (exec_relay_log_event(thd,rli))
3150 DBUG_PRINT("info", ("exec_relay_log_event() failed"));
3151 // do not scare the user if SQL thread was simply killed or stopped
3152 if (!sql_slave_killed(thd,rli))
3155 retrieve as much info as possible from the thd and, error
3156 codes and warnings and print this to the error log as to
3157 allow the user to locate the error
3159 uint32 const last_errno= rli->last_error().number;
3161 if (thd->is_error())
3163 char const *const errmsg= thd->main_da.message();
3165 DBUG_PRINT("info",
3166 ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d",
3167 thd->main_da.sql_errno(), last_errno));
3168 if (last_errno == 0)
3171 This function is reporting an error which was not reported
3172 while executing exec_relay_log_event().
3174 rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg);
3176 else if (last_errno != thd->main_da.sql_errno())
3179 * An error was reported while executing exec_relay_log_event()
3180 * however the error code differs from what is in the thread.
3181 * This function prints out more information to help finding
3182 * what caused the problem.
3184 sql_print_error("Slave (additional info): %s Error_code: %d",
3185 errmsg, thd->main_da.sql_errno());
3189 /* Print any warnings issued */
3190 List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
3191 MYSQL_ERROR *err;
3193 Added controlled slave thread cancel for replication
3194 of user-defined variables.
3196 bool udf_error = false;
3197 while ((err= it++))
3199 if (err->code == ER_CANT_OPEN_LIBRARY)
3200 udf_error = true;
3201 sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
3203 if (udf_error)
3204 sql_print_error("Error loading user-defined library, slave SQL "
3205 "thread aborted. Install the missing library, and restart the "
3206 "slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
3207 "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
3208 llbuff));
3209 else
3210 sql_print_error("\
3211 Error running query, slave SQL thread aborted. Fix the problem, and restart \
3212 the slave SQL thread with \"SLAVE START\". We stopped at log \
3213 '%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
3215 goto err;
3219 /* Thread stopped. Print the current replication position to the log */
3220 sql_print_information("Slave SQL thread exiting, replication stopped in log "
3221 "'%s' at position %s",
3222 RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
3224 err:
3227 Some events set some playgrounds, which won't be cleared because thread
3228 stops. Stopping of this thread may not be known to these events ("stop"
3229 request is detected only by the present function, not by events), so we
3230 must "proactively" clear playgrounds:
3232 thd->clear_error();
3233 rli->cleanup_context(thd, 1);
3235 Some extra safety, which should not been needed (normally, event deletion
3236 should already have done these assignments (each event which sets these
3237 variables is supposed to set them to 0 before terminating)).
3239 thd->catalog= 0;
3240 thd->set_query(NULL, 0);
3241 thd->reset_db(NULL, 0);
3242 thd_proc_info(thd, "Waiting for slave mutex on exit");
3243 pthread_mutex_lock(&rli->run_lock);
3244 /* We need data_lock, at least to wake up any waiting master_pos_wait() */
3245 pthread_mutex_lock(&rli->data_lock);
3246 DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
3247 /* When master_pos_wait() wakes up it will check this and terminate */
3248 rli->slave_running= 0;
3249 /* Forget the relay log's format */
3250 delete rli->relay_log.description_event_for_exec;
3251 rli->relay_log.description_event_for_exec= 0;
3252 /* Wake up master_pos_wait() */
3253 pthread_mutex_unlock(&rli->data_lock);
3254 DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
3255 pthread_cond_broadcast(&rli->data_cond);
3256 rli->ignore_log_space_limit= 0; /* don't need any lock */
3257 /* we die so won't remember charset - re-update them on next thread start */
3258 rli->cached_charset_invalidate();
3259 rli->save_temporary_tables = thd->temporary_tables;
3262 TODO: see if we can do this conditionally in next_event() instead
3263 to avoid unneeded position re-init
3265 thd->temporary_tables = 0; // remove tempation from destructor to close them
3266 DBUG_ASSERT(thd->net.buff != 0);
3267 net_end(&thd->net); // destructor will not free it, because we are weird
3268 DBUG_ASSERT(rli->sql_thd == thd);
3269 THD_CHECK_SENTRY(thd);
3270 rli->sql_thd= 0;
3271 set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
3272 pthread_mutex_lock(&LOCK_thread_count);
3273 THD_CHECK_SENTRY(thd);
3274 delete thd;
3275 pthread_mutex_unlock(&LOCK_thread_count);
3277 Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
3278 is important. Otherwise a killer_thread can execute between the calls and
3279 delete the mi structure leading to a crash! (see BUG#25306 for details)
3281 pthread_cond_broadcast(&rli->stop_cond);
3282 DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
3283 pthread_mutex_unlock(&rli->run_lock); // tell the world we are done
3285 DBUG_LEAVE; // Must match DBUG_ENTER()
3286 my_thread_end();
3287 pthread_exit(0);
3288 return 0; // Avoid compiler warnings
3293 process_io_create_file()
3296 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
3298 int error = 1;
3299 ulong num_bytes;
3300 bool cev_not_written;
3301 THD *thd = mi->io_thd;
3302 NET *net = &mi->mysql->net;
3303 DBUG_ENTER("process_io_create_file");
3305 if (unlikely(!cev->is_valid()))
3306 DBUG_RETURN(1);
3308 if (!rpl_filter->db_ok(cev->db))
3310 skip_load_data_infile(net);
3311 DBUG_RETURN(0);
3313 DBUG_ASSERT(cev->inited_from_old);
3314 thd->file_id = cev->file_id = mi->file_id++;
3315 thd->server_id = cev->server_id;
3316 cev_not_written = 1;
3318 if (unlikely(net_request_file(net,cev->fname)))
3320 sql_print_error("Slave I/O: failed requesting download of '%s'",
3321 cev->fname);
3322 goto err;
3326 This dummy block is so we could instantiate Append_block_log_event
3327 once and then modify it slightly instead of doing it multiple times
3328 in the loop
3331 Append_block_log_event aev(thd,0,0,0,0);
3333 for (;;)
3335 if (unlikely((num_bytes=my_net_read(net)) == packet_error))
3337 sql_print_error("Network read error downloading '%s' from master",
3338 cev->fname);
3339 goto err;
3341 if (unlikely(!num_bytes)) /* eof */
3343 /* 3.23 master wants it */
3344 net_write_command(net, 0, (uchar*) "", 0, (uchar*) "", 0);
3346 If we wrote Create_file_log_event, then we need to write
3347 Execute_load_log_event. If we did not write Create_file_log_event,
3348 then this is an empty file and we can just do as if the LOAD DATA
3349 INFILE had not existed, i.e. write nothing.
3351 if (unlikely(cev_not_written))
3352 break;
3353 Execute_load_log_event xev(thd,0,0);
3354 xev.log_pos = cev->log_pos;
3355 if (unlikely(mi->rli.relay_log.append(&xev)))
3357 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
3358 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
3359 "error writing Exec_load event to relay log");
3360 goto err;
3362 mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
3363 break;
3365 if (unlikely(cev_not_written))
3367 cev->block = net->read_pos;
3368 cev->block_len = num_bytes;
3369 if (unlikely(mi->rli.relay_log.append(cev)))
3371 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
3372 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
3373 "error writing Create_file event to relay log");
3374 goto err;
3376 cev_not_written=0;
3377 mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
3379 else
3381 aev.block = net->read_pos;
3382 aev.block_len = num_bytes;
3383 aev.log_pos = cev->log_pos;
3384 if (unlikely(mi->rli.relay_log.append(&aev)))
3386 mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
3387 ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
3388 "error writing Append_block event to relay log");
3389 goto err;
3391 mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
3395 error=0;
3396 err:
3397 DBUG_RETURN(error);
3402 Start using a new binary log on the master
3404 SYNOPSIS
3405 process_io_rotate()
3406 mi master_info for the slave
3407 rev The rotate log event read from the binary log
3409 DESCRIPTION
3410 Updates the master info with the place in the next binary
3411 log where we should start reading.
3412 Rotate the relay log to avoid mixed-format relay logs.
3414 NOTES
3415 We assume we already locked mi->data_lock
3417 RETURN VALUES
3418 0 ok
3419 1 Log event is illegal
3423 static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
3425 DBUG_ENTER("process_io_rotate");
3426 safe_mutex_assert_owner(&mi->data_lock);
3428 if (unlikely(!rev->is_valid()))
3429 DBUG_RETURN(1);
3431 /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
3432 memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
3433 mi->master_log_pos= rev->pos;
3434 DBUG_PRINT("info", ("master_log_pos: '%s' %lu",
3435 mi->master_log_name, (ulong) mi->master_log_pos));
3436 #ifndef DBUG_OFF
3438 If we do not do this, we will be getting the first
3439 rotate event forever, so we need to not disconnect after one.
3441 if (disconnect_slave_event_count)
3442 mi->events_till_disconnect++;
3443 #endif
3446 If description_event_for_queue is format <4, there is conversion in the
3447 relay log to the slave's format (4). And Rotate can mean upgrade or
3448 nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
3449 no need to reset description_event_for_queue now. And if it's nothing (same
3450 master version as before), no need (still using the slave's format).
3452 if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
3454 delete mi->rli.relay_log.description_event_for_queue;
3455 /* start from format 3 (MySQL 4.0) again */
3456 mi->rli.relay_log.description_event_for_queue= new
3457 Format_description_log_event(3);
3460 Rotate the relay log makes binlog format detection easier (at next slave
3461 start or mysqlbinlog)
3463 DBUG_RETURN(rotate_relay_log(mi) /* will take the right mutexes */);
3467 Reads a 3.23 event and converts it to the slave's format. This code was
3468 copied from MySQL 4.0.
3470 static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
3471 ulong event_len)
3473 const char *errmsg = 0;
3474 ulong inc_pos;
3475 bool ignore_event= 0;
3476 char *tmp_buf = 0;
3477 Relay_log_info *rli= &mi->rli;
3478 DBUG_ENTER("queue_binlog_ver_1_event");
3481 If we get Load event, we need to pass a non-reusable buffer
3482 to read_log_event, so we do a trick
3484 if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
3486 if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
3488 mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3489 ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed");
3490 DBUG_RETURN(1);
3492 memcpy(tmp_buf,buf,event_len);
3494 Create_file constructor wants a 0 as last char of buffer, this 0 will
3495 serve as the string-termination char for the file's name (which is at the
3496 end of the buffer)
3497 We must increment event_len, otherwise the event constructor will not see
3498 this end 0, which leads to segfault.
3500 tmp_buf[event_len++]=0;
3501 int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
3502 buf = (const char*)tmp_buf;
3505 This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
3506 send the loaded file, and write it to the relay log in the form of
3507 Append_block/Exec_load (the SQL thread needs the data, as that thread is not
3508 connected to the master).
3510 Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
3511 mi->rli.relay_log.description_event_for_queue);
3512 if (unlikely(!ev))
3514 sql_print_error("Read invalid event from master: '%s',\
3515 master could be corrupt but a more likely cause of this is a bug",
3516 errmsg);
3517 my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
3518 DBUG_RETURN(1);
3521 pthread_mutex_lock(&mi->data_lock);
3522 ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
3523 switch (ev->get_type_code()) {
3524 case STOP_EVENT:
3525 ignore_event= 1;
3526 inc_pos= event_len;
3527 break;
3528 case ROTATE_EVENT:
3529 if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
3531 delete ev;
3532 pthread_mutex_unlock(&mi->data_lock);
3533 DBUG_RETURN(1);
3535 inc_pos= 0;
3536 break;
3537 case CREATE_FILE_EVENT:
3539 Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
3540 queue_old_event() which is for 3.23 events which don't comprise
3541 CREATE_FILE_EVENT. This is because read_log_event() above has just
3542 transformed LOAD_EVENT into CREATE_FILE_EVENT.
3545 /* We come here when and only when tmp_buf != 0 */
3546 DBUG_ASSERT(tmp_buf != 0);
3547 inc_pos=event_len;
3548 ev->log_pos+= inc_pos;
3549 int error = process_io_create_file(mi,(Create_file_log_event*)ev);
3550 delete ev;
3551 mi->master_log_pos += inc_pos;
3552 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3553 pthread_mutex_unlock(&mi->data_lock);
3554 my_free((char*)tmp_buf, MYF(0));
3555 DBUG_RETURN(error);
3557 default:
3558 inc_pos= event_len;
3559 break;
3561 if (likely(!ignore_event))
3563 if (ev->log_pos)
3565 Don't do it for fake Rotate events (see comment in
3566 Log_event::Log_event(const char* buf...) in log_event.cc).
3568 ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
3569 if (unlikely(rli->relay_log.append(ev)))
3571 delete ev;
3572 pthread_mutex_unlock(&mi->data_lock);
3573 DBUG_RETURN(1);
3575 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3577 delete ev;
3578 mi->master_log_pos+= inc_pos;
3579 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3580 pthread_mutex_unlock(&mi->data_lock);
3581 DBUG_RETURN(0);
3585 Reads a 4.0 event and converts it to the slave's format. This code was copied
3586 from queue_binlog_ver_1_event(), with some affordable simplifications.
3588 static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
3589 ulong event_len)
3591 const char *errmsg = 0;
3592 ulong inc_pos;
3593 char *tmp_buf = 0;
3594 Relay_log_info *rli= &mi->rli;
3595 DBUG_ENTER("queue_binlog_ver_3_event");
3597 /* read_log_event() will adjust log_pos to be end_log_pos */
3598 Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
3599 mi->rli.relay_log.description_event_for_queue);
3600 if (unlikely(!ev))
3602 sql_print_error("Read invalid event from master: '%s',\
3603 master could be corrupt but a more likely cause of this is a bug",
3604 errmsg);
3605 my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
3606 DBUG_RETURN(1);
3608 pthread_mutex_lock(&mi->data_lock);
3609 switch (ev->get_type_code()) {
3610 case STOP_EVENT:
3611 goto err;
3612 case ROTATE_EVENT:
3613 if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
3615 delete ev;
3616 pthread_mutex_unlock(&mi->data_lock);
3617 DBUG_RETURN(1);
3619 inc_pos= 0;
3620 break;
3621 default:
3622 inc_pos= event_len;
3623 break;
3625 if (unlikely(rli->relay_log.append(ev)))
3627 delete ev;
3628 pthread_mutex_unlock(&mi->data_lock);
3629 DBUG_RETURN(1);
3631 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3632 delete ev;
3633 mi->master_log_pos+= inc_pos;
3634 err:
3635 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3636 pthread_mutex_unlock(&mi->data_lock);
3637 DBUG_RETURN(0);
3641 queue_old_event()
3643 Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
3644 (exactly, slave's) format. To do the conversion, we create a 5.0 event from
3645 the 3.23/4.0 bytes, then write this event to the relay log.
3647 TODO:
3648 Test this code before release - it has to be tested on a separate
3649 setup with 3.23 master or 4.0 master
3652 static int queue_old_event(Master_info *mi, const char *buf,
3653 ulong event_len)
3655 DBUG_ENTER("queue_old_event");
3657 switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
3659 case 1:
3660 DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
3661 case 3:
3662 DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
3663 default: /* unsupported format; eg version 2 */
3664 DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
3665 mi->rli.relay_log.description_event_for_queue->binlog_version));
3666 DBUG_RETURN(1);
3671 queue_event()
3673 If the event is 3.23/4.0, passes it to queue_old_event() which will convert
3674 it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
3675 no format conversion, it's pure read/write of bytes.
3676 So a 5.0.0 slave's relay log can contain events in the slave's format or in
3677 any >=5.0.0 format.
3680 static int queue_event(Master_info* mi,const char* buf, ulong event_len)
3682 int error= 0;
3683 ulong inc_pos;
3684 Relay_log_info *rli= &mi->rli;
3685 pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
3686 DBUG_ENTER("queue_event");
3688 LINT_INIT(inc_pos);
3690 if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
3691 buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
3692 DBUG_RETURN(queue_old_event(mi,buf,event_len));
3694 LINT_INIT(inc_pos);
3695 pthread_mutex_lock(&mi->data_lock);
3697 switch (buf[EVENT_TYPE_OFFSET]) {
3698 case STOP_EVENT:
3700 We needn't write this event to the relay log. Indeed, it just indicates a
3701 master server shutdown. The only thing this does is cleaning. But
3702 cleaning is already done on a per-master-thread basis (as the master
3703 server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
3704 prepared statements' deletion are TODO only when we binlog prep stmts).
3706 We don't even increment mi->master_log_pos, because we may be just after
3707 a Rotate event. Btw, in a few milliseconds we are going to have a Start
3708 event from the next binlog (unless the master is presently running
3709 without --log-bin).
3711 goto err;
3712 case ROTATE_EVENT:
3714 Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
3715 if (unlikely(process_io_rotate(mi,&rev)))
3717 error= 1;
3718 goto err;
3721 Now the I/O thread has just changed its mi->master_log_name, so
3722 incrementing mi->master_log_pos is nonsense.
3724 inc_pos= 0;
3725 break;
3727 case FORMAT_DESCRIPTION_EVENT:
3730 Create an event, and save it (when we rotate the relay log, we will have
3731 to write this event again).
3734 We are the only thread which reads/writes description_event_for_queue.
3735 The relay_log struct does not move (though some members of it can
3736 change), so we needn't any lock (no rli->data_lock, no log lock).
3738 Format_description_log_event* tmp;
3739 const char* errmsg;
3740 if (!(tmp= (Format_description_log_event*)
3741 Log_event::read_log_event(buf, event_len, &errmsg,
3742 mi->rli.relay_log.description_event_for_queue)))
3744 error= 2;
3745 goto err;
3747 delete mi->rli.relay_log.description_event_for_queue;
3748 mi->rli.relay_log.description_event_for_queue= tmp;
3750 Though this does some conversion to the slave's format, this will
3751 preserve the master's binlog format version, and number of event types.
3754 If the event was not requested by the slave (the slave did not ask for
3755 it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
3757 inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
3758 DBUG_PRINT("info",("binlog format is now %d",
3759 mi->rli.relay_log.description_event_for_queue->binlog_version));
3762 break;
3763 default:
3764 inc_pos= event_len;
3765 break;
3769 If this event is originating from this server, don't queue it.
3770 We don't check this for 3.23 events because it's simpler like this; 3.23
3771 will be filtered anyway by the SQL slave thread which also tests the
3772 server id (we must also keep this test in the SQL thread, in case somebody
3773 upgrades a 4.0 slave which has a not-filtered relay log).
3775 ANY event coming from ourselves can be ignored: it is obvious for queries;
3776 for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
3777 (--log-slave-updates would not log that) unless this slave is also its
3778 direct master (an unsupported, useless setup!).
3781 pthread_mutex_lock(log_lock);
3783 if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
3784 !mi->rli.replicate_same_server_id)
3787 Do not write it to the relay log.
3788 a) We still want to increment mi->master_log_pos, so that we won't
3789 re-read this event from the master if the slave IO thread is now
3790 stopped/restarted (more efficient if the events we are ignoring are big
3791 LOAD DATA INFILE).
3792 b) We want to record that we are skipping events, for the information of
3793 the slave SQL thread, otherwise that thread may let
3794 rli->group_relay_log_pos stay too small if the last binlog's event is
3795 ignored.
3796 But events which were generated by this slave and which do not exist in
3797 the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
3798 mi->master_log_pos.
3800 if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
3801 buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
3802 buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
3804 mi->master_log_pos+= inc_pos;
3805 memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
3806 DBUG_ASSERT(rli->ign_master_log_name_end[0]);
3807 rli->ign_master_log_pos_end= mi->master_log_pos;
3809 rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
3810 DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored",
3811 (ulong) mi->master_log_pos));
3813 else
3815 /* write the event to the relay log */
3816 if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
3818 mi->master_log_pos+= inc_pos;
3819 DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
3820 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3822 else
3823 error= 3;
3824 rli->ign_master_log_name_end[0]= 0; // last event is not ignored
3826 pthread_mutex_unlock(log_lock);
3829 err:
3830 pthread_mutex_unlock(&mi->data_lock);
3831 DBUG_PRINT("info", ("error: %d", error));
3832 DBUG_RETURN(error);
3836 void end_relay_log_info(Relay_log_info* rli)
3838 DBUG_ENTER("end_relay_log_info");
3840 if (!rli->inited)
3841 DBUG_VOID_RETURN;
3842 if (rli->info_fd >= 0)
3844 end_io_cache(&rli->info_file);
3845 (void) my_close(rli->info_fd, MYF(MY_WME));
3846 rli->info_fd = -1;
3848 if (rli->cur_log_fd >= 0)
3850 end_io_cache(&rli->cache_buf);
3851 (void)my_close(rli->cur_log_fd, MYF(MY_WME));
3852 rli->cur_log_fd = -1;
3854 rli->inited = 0;
3855 rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
3856 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
3858 Delete the slave's temporary tables from memory.
3859 In the future there will be other actions than this, to ensure persistance
3860 of slave's temp tables after shutdown.
3862 rli->close_temporary_tables();
3863 DBUG_VOID_RETURN;
3868 Hook to detach the active VIO before closing a connection handle.
3870 The client API might close the connection (and associated data)
3871 in case it encounters a unrecoverable (network) error. This hook
3872 is called from the client code before the VIO handle is deleted
3873 allows the thread to detach the active vio so it does not point
3874 to freed memory.
3876 Other calls to THD::clear_active_vio throughout this module are
3877 redundant due to the hook but are left in place for illustrative
3878 purposes.
3881 extern "C" void slave_io_thread_detach_vio()
3883 #ifdef SIGNAL_WITH_VIO_CLOSE
3884 THD *thd= current_thd;
3885 if (thd && thd->slave_thread)
3886 thd->clear_active_vio();
3887 #endif
3892 Try to connect until successful or slave killed
3894 SYNPOSIS
3895 safe_connect()
3896 thd Thread handler for slave
3897 mysql MySQL connection handle
3898 mi Replication handle
3900 RETURN
3901 0 ok
3902 # Error
3905 static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi)
3907 DBUG_ENTER("safe_connect");
3909 DBUG_RETURN(connect_to_master(thd, mysql, mi, 0, 0));
3914 SYNPOSIS
3915 connect_to_master()
3917 IMPLEMENTATION
3918 Try to connect until successful or slave killed or we have retried
3919 master_retry_count times
3922 static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
3923 bool reconnect, bool suppress_warnings)
3925 int slave_was_killed;
3926 int last_errno= -2; // impossible error
3927 ulong err_count=0;
3928 char llbuff[22];
3929 DBUG_ENTER("connect_to_master");
3931 #ifndef DBUG_OFF
3932 mi->events_till_disconnect = disconnect_slave_event_count;
3933 #endif
3934 ulong client_flag= CLIENT_REMEMBER_OPTIONS;
3935 if (opt_slave_compressed_protocol)
3936 client_flag=CLIENT_COMPRESS; /* We will use compression */
3938 mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
3939 mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
3941 #ifdef HAVE_OPENSSL
3942 if (mi->ssl)
3944 mysql_ssl_set(mysql,
3945 mi->ssl_key[0]?mi->ssl_key:0,
3946 mi->ssl_cert[0]?mi->ssl_cert:0,
3947 mi->ssl_ca[0]?mi->ssl_ca:0,
3948 mi->ssl_capath[0]?mi->ssl_capath:0,
3949 mi->ssl_cipher[0]?mi->ssl_cipher:0);
3950 mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
3951 &mi->ssl_verify_server_cert);
3953 #endif
3955 mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
3956 /* This one is not strictly needed but we have it here for completeness */
3957 mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
3959 while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
3960 (reconnect ? mysql_reconnect(mysql) != 0 :
3961 mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
3962 mi->port, 0, client_flag) == 0))
3964 /* Don't repeat last error */
3965 if ((int)mysql_errno(mysql) != last_errno)
3967 last_errno=mysql_errno(mysql);
3968 suppress_warnings= 0;
3969 mi->report(ERROR_LEVEL, last_errno,
3970 "error %s to master '%s@%s:%d'"
3971 " - retry-time: %d retries: %lu",
3972 (reconnect ? "reconnecting" : "connecting"),
3973 mi->user, mi->host, mi->port,
3974 mi->connect_retry, master_retry_count);
3977 By default we try forever. The reason is that failure will trigger
3978 master election, so if the user did not set master_retry_count we
3979 do not want to have election triggered on the first failure to
3980 connect
3982 if (++err_count == master_retry_count)
3984 slave_was_killed=1;
3985 if (reconnect)
3986 change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
3987 break;
3989 safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
3990 (void*)mi);
3993 if (!slave_was_killed)
3995 mi->clear_error(); // clear possible left over reconnect error
3996 if (reconnect)
3998 if (!suppress_warnings && global_system_variables.log_warnings)
3999 sql_print_information("Slave: connected to master '%s@%s:%d',\
4000 replication resumed in log '%s' at position %s", mi->user,
4001 mi->host, mi->port,
4002 IO_RPL_LOG_NAME,
4003 llstr(mi->master_log_pos,llbuff));
4005 else
4007 change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
4008 general_log_print(thd, COM_CONNECT_OUT, "%s@%s:%d",
4009 mi->user, mi->host, mi->port);
4011 #ifdef SIGNAL_WITH_VIO_CLOSE
4012 thd->set_active_vio(mysql->net.vio);
4013 #endif
4015 mysql->reconnect= 1;
4016 DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
4017 DBUG_RETURN(slave_was_killed);
4022 safe_reconnect()
4024 IMPLEMENTATION
4025 Try to connect until successful or slave killed or we have retried
4026 master_retry_count times
4029 static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
4030 bool suppress_warnings)
4032 DBUG_ENTER("safe_reconnect");
4033 DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
4038 Store the file and position where the execute-slave thread are in the
4039 relay log.
4041 SYNOPSIS
4042 flush_relay_log_info()
4043 rli Relay log information
4045 NOTES
4046 - As this is only called by the slave thread, we don't need to
4047 have a lock on this.
4048 - If there is an active transaction, then we don't update the position
4049 in the relay log. This is to ensure that we re-execute statements
4050 if we die in the middle of an transaction that was rolled back.
4051 - As a transaction never spans binary logs, we don't have to handle the
4052 case where we do a relay-log-rotation in the middle of the transaction.
4053 If this would not be the case, we would have to ensure that we
4054 don't delete the relay log file where the transaction started when
4055 we switch to a new relay log file.
4057 TODO
4058 - Change the log file information to a binary format to avoid calling
4059 longlong2str.
4061 RETURN VALUES
4062 0 ok
4063 1 write error
4066 bool flush_relay_log_info(Relay_log_info* rli)
4068 bool error=0;
4069 DBUG_ENTER("flush_relay_log_info");
4071 if (unlikely(rli->no_storage))
4072 DBUG_RETURN(0);
4074 IO_CACHE *file = &rli->info_file;
4075 char buff[FN_REFLEN*2+22*2+4], *pos;
4077 my_b_seek(file, 0L);
4078 pos=strmov(buff, rli->group_relay_log_name);
4079 *pos++='\n';
4080 pos=longlong2str(rli->group_relay_log_pos, pos, 10);
4081 *pos++='\n';
4082 pos=strmov(pos, rli->group_master_log_name);
4083 *pos++='\n';
4084 pos=longlong2str(rli->group_master_log_pos, pos, 10);
4085 *pos='\n';
4086 if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
4087 error=1;
4088 if (flush_io_cache(file))
4089 error=1;
4091 /* Flushing the relay log is done by the slave I/O thread */
4092 DBUG_RETURN(error);
4097 Called when we notice that the current "hot" log got rotated under our feet.
4100 static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
4102 DBUG_ENTER("reopen_relay_log");
4103 DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
4104 DBUG_ASSERT(rli->cur_log_fd == -1);
4106 IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
4107 if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
4108 errmsg)) <0)
4109 DBUG_RETURN(0);
4111 We want to start exactly where we was before:
4112 relay_log_pos Current log pos
4113 pending Number of bytes already processed from the event
4115 rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
4116 my_b_seek(cur_log,rli->event_relay_log_pos);
4117 DBUG_RETURN(cur_log);
4122 Reads next event from the relay log. Should be called from the
4123 slave IO thread.
4125 @param rli Relay_log_info structure for the slave IO thread.
4127 @return The event read, or NULL on error. If an error occurs, the
4128 error is reported through the sql_print_information() or
4129 sql_print_error() functions.
4131 static Log_event* next_event(Relay_log_info* rli)
4133 Log_event* ev;
4134 IO_CACHE* cur_log = rli->cur_log;
4135 pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
4136 const char* errmsg=0;
4137 THD* thd = rli->sql_thd;
4138 DBUG_ENTER("next_event");
4140 DBUG_ASSERT(thd != 0);
4142 #ifndef DBUG_OFF
4143 if (abort_slave_event_count && !rli->events_till_abort--)
4144 DBUG_RETURN(0);
4145 #endif
4148 For most operations we need to protect rli members with data_lock,
4149 so we assume calling function acquired this mutex for us and we will
4150 hold it for the most of the loop below However, we will release it
4151 whenever it is worth the hassle, and in the cases when we go into a
4152 pthread_cond_wait() with the non-data_lock mutex
4154 safe_mutex_assert_owner(&rli->data_lock);
4156 while (!sql_slave_killed(thd,rli))
4159 We can have two kinds of log reading:
4160 hot_log:
4161 rli->cur_log points at the IO_CACHE of relay_log, which
4162 is actively being updated by the I/O thread. We need to be careful
4163 in this case and make sure that we are not looking at a stale log that
4164 has already been rotated. If it has been, we reopen the log.
4166 The other case is much simpler:
4167 We just have a read only log that nobody else will be updating.
4169 bool hot_log;
4170 if ((hot_log = (cur_log != &rli->cache_buf)))
4172 DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
4173 pthread_mutex_lock(log_lock);
4176 Reading xxx_file_id is safe because the log will only
4177 be rotated when we hold relay_log.LOCK_log
4179 if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
4181 // The master has switched to a new log file; Reopen the old log file
4182 cur_log=reopen_relay_log(rli, &errmsg);
4183 pthread_mutex_unlock(log_lock);
4184 if (!cur_log) // No more log files
4185 goto err;
4186 hot_log=0; // Using old binary log
4190 As there is no guarantee that the relay is open (for example, an I/O
4191 error during a write by the slave I/O thread may have closed it), we
4192 have to test it.
4194 if (!my_b_inited(cur_log))
4195 goto err;
4196 #ifndef DBUG_OFF
4198 /* This is an assertion which sometimes fails, let's try to track it */
4199 char llbuf1[22], llbuf2[22];
4200 DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
4201 llstr(my_b_tell(cur_log),llbuf1),
4202 llstr(rli->event_relay_log_pos,llbuf2)));
4203 DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
4204 DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
4206 #endif
4208 Relay log is always in new format - if the master is 3.23, the
4209 I/O thread will convert the format for us.
4210 A problem: the description event may be in a previous relay log. So if
4211 the slave has been shutdown meanwhile, we would have to look in old relay
4212 logs, which may even have been deleted. So we need to write this
4213 description event at the beginning of the relay log.
4214 When the relay log is created when the I/O thread starts, easy: the
4215 master will send the description event and we will queue it.
4216 But if the relay log is created by new_file(): then the solution is:
4217 MYSQL_BIN_LOG::open() will write the buffered description event.
4219 if ((ev=Log_event::read_log_event(cur_log,0,
4220 rli->relay_log.description_event_for_exec)))
4223 DBUG_ASSERT(thd==rli->sql_thd);
4225 read it while we have a lock, to avoid a mutex lock in
4226 inc_event_relay_log_pos()
4228 rli->future_event_relay_log_pos= my_b_tell(cur_log);
4229 if (hot_log)
4230 pthread_mutex_unlock(log_lock);
4231 DBUG_RETURN(ev);
4233 DBUG_ASSERT(thd==rli->sql_thd);
4234 if (opt_reckless_slave) // For mysql-test
4235 cur_log->error = 0;
4236 if (cur_log->error < 0)
4238 errmsg = "slave SQL thread aborted because of I/O error";
4239 if (hot_log)
4240 pthread_mutex_unlock(log_lock);
4241 goto err;
4243 if (!cur_log->error) /* EOF */
4246 On a hot log, EOF means that there are no more updates to
4247 process and we must block until I/O thread adds some and
4248 signals us to continue
4250 if (hot_log)
4253 We say in Seconds_Behind_Master that we have "caught up". Note that
4254 for example if network link is broken but I/O slave thread hasn't
4255 noticed it (slave_net_timeout not elapsed), then we'll say "caught
4256 up" whereas we're not really caught up. Fixing that would require
4257 internally cutting timeout in smaller pieces in network read, no
4258 thanks. Another example: SQL has caught up on I/O, now I/O has read
4259 a new event and is queuing it; the false "0" will exist until SQL
4260 finishes executing the new event; it will be look abnormal only if
4261 the events have old timestamps (then you get "many", 0, "many").
4263 Transient phases like this can be fixed with implemeting
4264 Heartbeat event which provides the slave the status of the
4265 master at time the master does not have any new update to send.
4266 Seconds_Behind_Master would be zero only when master has no
4267 more updates in binlog for slave. The heartbeat can be sent
4268 in a (small) fraction of slave_net_timeout. Until it's done
4269 rli->last_master_timestamp is temporarely (for time of
4270 waiting for the following event) reset whenever EOF is
4271 reached.
4273 time_t save_timestamp= rli->last_master_timestamp;
4274 rli->last_master_timestamp= 0;
4276 DBUG_ASSERT(rli->relay_log.get_open_count() ==
4277 rli->cur_log_old_open_count);
4279 if (rli->ign_master_log_name_end[0])
4281 /* We generate and return a Rotate, to make our positions advance */
4282 DBUG_PRINT("info",("seeing an ignored end segment"));
4283 ev= new Rotate_log_event(rli->ign_master_log_name_end,
4284 0, rli->ign_master_log_pos_end,
4285 Rotate_log_event::DUP_NAME);
4286 rli->ign_master_log_name_end[0]= 0;
4287 pthread_mutex_unlock(log_lock);
4288 if (unlikely(!ev))
4290 errmsg= "Slave SQL thread failed to create a Rotate event "
4291 "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
4292 goto err;
4294 ev->server_id= 0; // don't be ignored by slave SQL thread
4295 DBUG_RETURN(ev);
4299 We can, and should release data_lock while we are waiting for
4300 update. If we do not, show slave status will block
4302 pthread_mutex_unlock(&rli->data_lock);
4305 Possible deadlock :
4306 - the I/O thread has reached log_space_limit
4307 - the SQL thread has read all relay logs, but cannot purge for some
4308 reason:
4309 * it has already purged all logs except the current one
4310 * there are other logs than the current one but they're involved in
4311 a transaction that finishes in the current one (or is not finished)
4312 Solution :
4313 Wake up the possibly waiting I/O thread, and set a boolean asking
4314 the I/O thread to temporarily ignore the log_space_limit
4315 constraint, because we do not want the I/O thread to block because of
4316 space (it's ok if it blocks for any other reason (e.g. because the
4317 master does not send anything). Then the I/O thread stops waiting
4318 and reads one more event and starts honoring log_space_limit again.
4320 If the SQL thread needs more events to be able to rotate the log (it
4321 might need to finish the current group first), then it can ask for one
4322 more at a time. Thus we don't outgrow the relay log indefinitely,
4323 but rather in a controlled manner, until the next rotate.
4325 When the SQL thread starts it sets ignore_log_space_limit to false.
4326 We should also reset ignore_log_space_limit to 0 when the user does
4327 RESET SLAVE, but in fact, no need as RESET SLAVE requires that the slave
4328 be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
4329 it stops.
4331 pthread_mutex_lock(&rli->log_space_lock);
4334 If we have reached the limit of the relay space and we
4335 are going to sleep, waiting for more events:
4337 1. If outside a group, SQL thread asks the IO thread
4338 to force a rotation so that the SQL thread purges
4339 logs next time it processes an event (thus space is
4340 freed).
4342 2. If in a group, SQL thread asks the IO thread to
4343 ignore the limit and queues yet one more event
4344 so that the SQL thread finishes the group and
4345 is are able to rotate and purge sometime soon.
4347 if (rli->log_space_limit &&
4348 rli->log_space_limit < rli->log_space_total)
4350 /* force rotation if not in an unfinished group */
4351 rli->sql_force_rotate_relay= !rli->is_in_group();
4353 /* ask for one more event */
4354 rli->ignore_log_space_limit= true;
4358 If the I/O thread is blocked, unblock it. Ok to broadcast
4359 after unlock, because the mutex is only destroyed in
4360 ~Relay_log_info(), i.e. when rli is destroyed, and rli will
4361 not be destroyed before we exit the present function.
4363 pthread_mutex_unlock(&rli->log_space_lock);
4364 pthread_cond_broadcast(&rli->log_space_cond);
4365 // Note that wait_for_update unlocks lock_log !
4366 rli->relay_log.wait_for_update(rli->sql_thd, 1);
4367 // re-acquire data lock since we released it earlier
4368 pthread_mutex_lock(&rli->data_lock);
4369 rli->last_master_timestamp= save_timestamp;
4370 continue;
4373 If the log was not hot, we need to move to the next log in
4374 sequence. The next log could be hot or cold, we deal with both
4375 cases separately after doing some common initialization
4377 end_io_cache(cur_log);
4378 DBUG_ASSERT(rli->cur_log_fd >= 0);
4379 my_close(rli->cur_log_fd, MYF(MY_WME));
4380 rli->cur_log_fd = -1;
4382 if (relay_log_purge)
4385 purge_first_log will properly set up relay log coordinates in rli.
4386 If the group's coordinates are equal to the event's coordinates
4387 (i.e. the relay log was not rotated in the middle of a group),
4388 we can purge this relay log too.
4389 We do ulonglong and string comparisons, this may be slow but
4390 - purging the last relay log is nice (it can save 1GB of disk), so we
4391 like to detect the case where we can do it, and given this,
4392 - I see no better detection method
4393 - purge_first_log is not called that often
4395 if (rli->relay_log.purge_first_log
4396 (rli,
4397 rli->group_relay_log_pos == rli->event_relay_log_pos
4398 && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
4400 errmsg = "Error purging processed logs";
4401 goto err;
4404 else
4407 If hot_log is set, then we already have a lock on
4408 LOCK_log. If not, we have to get the lock.
4410 According to Sasha, the only time this code will ever be executed
4411 is if we are recovering from a bug.
4413 if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
4415 errmsg = "error switching to the next log";
4416 goto err;
4418 rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
4419 strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
4420 sizeof(rli->event_relay_log_name)-1);
4421 flush_relay_log_info(rli);
4425 Now we want to open this next log. To know if it's a hot log (the one
4426 being written by the I/O thread now) or a cold log, we can use
4427 is_active(); if it is hot, we use the I/O cache; if it's cold we open
4428 the file normally. But if is_active() reports that the log is hot, this
4429 may change between the test and the consequence of the test. So we may
4430 open the I/O cache whereas the log is now cold, which is nonsense.
4431 To guard against this, we need to have LOCK_log.
4434 DBUG_PRINT("info",("hot_log: %d",hot_log));
4435 if (!hot_log) /* if hot_log, we already have this mutex */
4436 pthread_mutex_lock(log_lock);
4437 if (rli->relay_log.is_active(rli->linfo.log_file_name))
4439 #ifdef EXTRA_DEBUG
4440 if (global_system_variables.log_warnings)
4441 sql_print_information("next log '%s' is currently active",
4442 rli->linfo.log_file_name);
4443 #endif
4444 rli->cur_log= cur_log= rli->relay_log.get_log_file();
4445 rli->cur_log_old_open_count= rli->relay_log.get_open_count();
4446 DBUG_ASSERT(rli->cur_log_fd == -1);
4449 When the SQL thread is [stopped and] (re)started the
4450 following may happen:
4452 1. Log was hot at stop time and remains hot at restart
4454 SQL thread reads again from hot_log (SQL thread was
4455 reading from the active log when it was stopped and the
4456 very same log is still active on SQL thread restart).
4458 In this case, my_b_seek is performed on cur_log, while
4459 cur_log points to relay_log.get_log_file();
4461 2. Log was hot at stop time but got cold before restart
4463 The log was hot when SQL thread stopped, but it is not
4464 anymore when the SQL thread restarts.
4466 In this case, the SQL thread reopens the log, using
4467 cache_buf, ie, cur_log points to &cache_buf, and thence
4468 its coordinates are reset.
4470 3. Log was already cold at stop time
4472 The log was not hot when the SQL thread stopped, and, of
4473 course, it will not be hot when it restarts.
4475 In this case, the SQL thread opens the cold log again,
4476 using cache_buf, ie, cur_log points to &cache_buf, and
4477 thence its coordinates are reset.
4479 4. Log was hot at stop time, DBA changes to previous cold
4480 log and restarts SQL thread
4482 The log was hot when the SQL thread was stopped, but the
4483 user changed the coordinates of the SQL thread to
4484 restart from a previous cold log.
4486 In this case, at start time, cur_log points to a cold
4487 log, opened using &cache_buf as cache, and coordinates
4488 are reset. However, as it moves on to the next logs, it
4489 will eventually reach the hot log. If the hot log is the
4490 same at the time the SQL thread was stopped, then
4491 coordinates were not reset - the cur_log will point to
4492 relay_log.get_log_file(), and not a freshly opened
4493 IO_CACHE through cache_buf. For this reason we need to
4494 deploy a my_b_seek before calling check_binlog_magic at
4495 this point of the code (see: BUG#55263 for more
4496 details).
4498 NOTES:
4499 - We must keep the LOCK_log to read the 4 first bytes, as
4500 this is a hot log (same as when we call read_log_event()
4501 above: for a hot log we take the mutex).
4503 - Because of scenario #4 above, we need to have a
4504 my_b_seek here. Otherwise, we might hit the assertion
4505 inside check_binlog_magic.
4508 my_b_seek(cur_log, (my_off_t) 0);
4509 if (check_binlog_magic(cur_log,&errmsg))
4511 if (!hot_log) pthread_mutex_unlock(log_lock);
4512 goto err;
4514 if (!hot_log) pthread_mutex_unlock(log_lock);
4515 continue;
4517 if (!hot_log) pthread_mutex_unlock(log_lock);
4519 if we get here, the log was not hot, so we will have to open it
4520 ourselves. We are sure that the log is still not hot now (a log can get
4521 from hot to cold, but not from cold to hot). No need for LOCK_log.
4523 #ifdef EXTRA_DEBUG
4524 if (global_system_variables.log_warnings)
4525 sql_print_information("next log '%s' is not active",
4526 rli->linfo.log_file_name);
4527 #endif
4528 // open_binlog() will check the magic header
4529 if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
4530 &errmsg)) <0)
4531 goto err;
4533 else
4536 Read failed with a non-EOF error.
4537 TODO: come up with something better to handle this error
4539 if (hot_log)
4540 pthread_mutex_unlock(log_lock);
4541 sql_print_error("Slave SQL thread: I/O error reading \
4542 event(errno: %d cur_log->error: %d)",
4543 my_errno,cur_log->error);
4544 // set read position to the beginning of the event
4545 my_b_seek(cur_log,rli->event_relay_log_pos);
4546 /* otherwise, we have had a partial read */
4547 errmsg = "Aborting slave SQL thread because of partial event read";
4548 break; // To end of function
4551 if (!errmsg && global_system_variables.log_warnings)
4553 sql_print_information("Error reading relay log event: %s",
4554 "slave SQL thread was killed");
4555 DBUG_RETURN(0);
4558 err:
4559 if (errmsg)
4560 sql_print_error("Error reading relay log event: %s", errmsg);
4561 DBUG_RETURN(0);
4565 Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
4566 because of size is simpler because when we do it we already have all relevant
4567 locks; here we don't, so this function is mainly taking locks).
4568 Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
4569 is void).
4572 int rotate_relay_log(Master_info* mi)
4574 DBUG_ENTER("rotate_relay_log");
4575 Relay_log_info* rli= &mi->rli;
4576 int error= 0;
4579 We need to test inited because otherwise, new_file() will attempt to lock
4580 LOCK_log, which may not be inited (if we're not a slave).
4582 if (!rli->inited)
4584 DBUG_PRINT("info", ("rli->inited == 0"));
4585 goto end;
4588 /* If the relay log is closed, new_file() will do nothing. */
4589 if ((error= rli->relay_log.new_file()))
4590 goto end;
4593 We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
4594 be counted, so imagine a succession of FLUSH LOGS and assume the slave
4595 threads are started:
4596 relay_log_space decreases by the size of the deleted relay log, but does
4597 not increase, so flush-after-flush we may become negative, which is wrong.
4598 Even if this will be corrected as soon as a query is replicated on the
4599 slave (because the I/O thread will then call harvest_bytes_written() which
4600 will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
4601 output in SHOW SLAVE STATUS meanwhile. So we harvest now.
4602 If the log is closed, then this will just harvest the last writes, probably
4603 0 as they probably have been harvested.
4605 rli->relay_log.harvest_bytes_written(&rli->log_space_total);
4606 end:
4607 DBUG_RETURN(error);
4612 Detects, based on master's version (as found in the relay log), if master
4613 has a certain bug.
4614 @param rli Relay_log_info which tells the master's version
4615 @param bug_id Number of the bug as found in bugs.mysql.com
4616 @param report bool report error message, default TRUE
4618 @param pred Predicate function that will be called with @c param to
4619 check for the bug. If the function return @c true, the bug is present,
4620 otherwise, it is not.
4622 @param param State passed to @c pred function.
4624 @return TRUE if master has the bug, FALSE if it does not.
4626 bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
4627 bool (*pred)(const void *), const void *param)
4629 struct st_version_range_for_one_bug {
4630 uint bug_id;
4631 const uchar introduced_in[3]; // first version with bug
4632 const uchar fixed_in[3]; // first version with fix
4634 static struct st_version_range_for_one_bug versions_for_all_bugs[]=
4636 {24432, { 5, 0, 24 }, { 5, 0, 38 } },
4637 {24432, { 5, 1, 12 }, { 5, 1, 17 } },
4638 {33029, { 5, 0, 0 }, { 5, 0, 58 } },
4639 {33029, { 5, 1, 0 }, { 5, 1, 12 } },
4640 {37426, { 5, 1, 0 }, { 5, 1, 26 } },
4642 const uchar *master_ver=
4643 rli->relay_log.description_event_for_exec->server_version_split;
4645 DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
4647 for (uint i= 0;
4648 i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
4650 const uchar *introduced_in= versions_for_all_bugs[i].introduced_in,
4651 *fixed_in= versions_for_all_bugs[i].fixed_in;
4652 if ((versions_for_all_bugs[i].bug_id == bug_id) &&
4653 (memcmp(introduced_in, master_ver, 3) <= 0) &&
4654 (memcmp(fixed_in, master_ver, 3) > 0) &&
4655 (pred == NULL || (*pred)(param)))
4657 if (!report)
4658 return TRUE;
4659 // a short message for SHOW SLAVE STATUS (message length constraints)
4660 my_printf_error(ER_UNKNOWN_ERROR, "master may suffer from"
4661 " http://bugs.mysql.com/bug.php?id=%u"
4662 " so slave stops; check error log on slave"
4663 " for more info", MYF(0), bug_id);
4664 // a verbose message for the error log
4665 rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
4666 "According to the master's version ('%s'),"
4667 " it is probable that master suffers from this bug:"
4668 " http://bugs.mysql.com/bug.php?id=%u"
4669 " and thus replicating the current binary log event"
4670 " may make the slave's data become different from the"
4671 " master's data."
4672 " To take no risk, slave refuses to replicate"
4673 " this event and stops."
4674 " We recommend that all updates be stopped on the"
4675 " master and slave, that the data of both be"
4676 " manually synchronized,"
4677 " that master's binary logs be deleted,"
4678 " that master be upgraded to a version at least"
4679 " equal to '%d.%d.%d'. Then replication can be"
4680 " restarted.",
4681 rli->relay_log.description_event_for_exec->server_version,
4682 bug_id,
4683 fixed_in[0], fixed_in[1], fixed_in[2]);
4684 return TRUE;
4687 return FALSE;
4691 BUG#33029, For all 5.0 up to 5.0.58 exclusive, and 5.1 up to 5.1.12
4692 exclusive, if one statement in a SP generated AUTO_INCREMENT value
4693 by the top statement, all statements after it would be considered
4694 generated AUTO_INCREMENT value by the top statement, and a
4695 erroneous INSERT_ID value might be associated with these statement,
4696 which could cause duplicate entry error and stop the slave.
4698 Detect buggy master to work around.
4700 bool rpl_master_erroneous_autoinc(THD *thd)
4702 if (active_mi && active_mi->rli.sql_thd == thd)
4704 Relay_log_info *rli= &active_mi->rli;
4705 DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;);
4706 return rpl_master_has_bug(rli, 33029, FALSE, NULL, NULL);
4708 return FALSE;
4711 #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
4712 template class I_List_iterator<i_string>;
4713 template class I_List_iterator<i_string_pair>;
4714 #endif
4717 @} (end of group Replication)
4720 #endif /* HAVE_REPLICATION */