mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / sql / sql_insert.cc
bloba59e3fd14e9f7bfada0606b5e67989fd98961d55
1 /*
2 Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 /* Insert of records */
21 INSERT DELAYED
23 Insert delayed is distinguished from a normal insert by lock_type ==
24 TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
25 "delayed" table (delayed_get_table()), but falls back to
26 open_and_lock_tables() on error and proceeds as normal insert then.
28 Opening a "delayed" table means to find a delayed insert thread that
29 has the table open already. If this fails, a new thread is created and
30 waited for to open and lock the table.
32 If accessing the thread succeeded, in
33 Delayed_insert::get_local_table() the table of the thread is copied
34 for local use. A copy is required because the normal insert logic
35 works on a target table, but the other threads table object must not
36 be used. The insert logic uses the record buffer to create a record.
37 And the delayed insert thread uses the record buffer to pass the
38 record to the table handler. So there must be different objects. Also
39 the copied table is not included in the lock, so that the statement
40 can proceed even if the real table cannot be accessed at this moment.
42 Copying a table object is not a trivial operation. Besides the TABLE
43 object there are the field pointer array, the field objects and the
44 record buffer. After copying the field objects, their pointers into
45 the record must be "moved" to point to the new record buffer.
47 After this setup the normal insert logic is used. Only that for
48 delayed inserts write_delayed() is called instead of write_record().
49 It inserts the rows into a queue and signals the delayed insert thread
50 instead of writing directly to the table.
52 The delayed insert thread awakes from the signal. It locks the table,
53 inserts the rows from the queue, unlocks the table, and waits for the
54 next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
58 #include "mysql_priv.h"
59 #include "sp_head.h"
60 #include "sql_trigger.h"
61 #include "sql_select.h"
62 #include "sql_show.h"
63 #include "slave.h"
64 #include "rpl_mi.h"
66 #include "debug_sync.h"
68 #ifndef EMBEDDED_LIBRARY
69 static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
70 static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
71 LEX_STRING query, bool ignore, bool log_on);
72 static void end_delayed_insert(THD *thd);
73 pthread_handler_t handle_delayed_insert(void *arg);
74 static void unlink_blobs(register TABLE *table);
75 #endif
76 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
78 /* Define to force use of my_malloc() if the allocated memory block is big */
80 #ifndef HAVE_ALLOCA
81 #define my_safe_alloca(size, min_length) my_alloca(size)
82 #define my_safe_afree(ptr, size, min_length) my_afree(ptr)
83 #else
84 #define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
85 #define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
86 #endif
89 Check that insert/update fields are from the same single table of a view.
91 SYNOPSIS
92 check_view_single_update()
93 fields The insert/update fields to be checked.
94 view The view for insert.
95 map [in/out] The insert table map.
97 DESCRIPTION
98 This function is called in 2 cases:
99 1. to check insert fields. In this case *map will be set to 0.
100 Insert fields are checked to be all from the same single underlying
101 table of the given view. Otherwise the error is thrown. Found table
102 map is returned in the map parameter.
103 2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
104 In this case *map contains table_map found on the previous call of
105 the function to check insert fields. Update fields are checked to be
106 from the same table as the insert fields.
108 RETURN
109 0 OK
110 1 Error
113 bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
114 table_map *map)
116 /* it is join view => we need to find the table for update */
117 List_iterator_fast<Item> it(fields);
118 Item *item;
119 TABLE_LIST *tbl= 0; // reset for call to check_single_table()
120 table_map tables= 0;
122 while ((item= it++))
123 tables|= item->used_tables();
125 /* Check found map against provided map */
126 if (*map)
128 if (tables != *map)
129 goto error;
130 return FALSE;
133 if (view->check_single_table(&tbl, tables, view) || tbl == 0)
134 goto error;
136 view->table= tbl->table;
137 *map= tables;
139 return FALSE;
141 error:
142 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
143 view->view_db.str, view->view_name.str);
144 return TRUE;
149 Check if insert fields are correct.
151 SYNOPSIS
152 check_insert_fields()
153 thd The current thread.
154 table The table for insert.
155 fields The insert fields.
156 values The insert values.
157 check_unique If duplicate values should be rejected.
159 NOTE
160 Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
161 or leaves it as is, depending on if timestamp should be updated or
162 not.
164 RETURN
165 0 OK
166 -1 Error
169 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
170 List<Item> &fields, List<Item> &values,
171 bool check_unique, table_map *map)
173 TABLE *table= table_list->table;
175 if (!table_list->updatable)
177 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
178 return -1;
181 if (fields.elements == 0 && values.elements != 0)
183 if (!table)
185 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
186 table_list->view_db.str, table_list->view_name.str);
187 return -1;
189 if (values.elements != table->s->fields)
191 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
192 return -1;
194 #ifndef NO_EMBEDDED_ACCESS_CHECKS
195 Field_iterator_table_ref field_it;
196 field_it.set(table_list);
197 if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
198 return -1;
199 #endif
200 clear_timestamp_auto_bits(table->timestamp_field_type,
201 TIMESTAMP_AUTO_SET_ON_INSERT);
203 No fields are provided so all fields must be provided in the values.
204 Thus we set all bits in the write set.
206 bitmap_set_all(table->write_set);
208 else
209 { // Part field list
210 SELECT_LEX *select_lex= &thd->lex->select_lex;
211 Name_resolution_context *context= &select_lex->context;
212 Name_resolution_context_state ctx_state;
213 int res;
215 if (fields.elements != values.elements)
217 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
218 return -1;
221 thd->dup_field= 0;
222 select_lex->no_wrap_view_item= TRUE;
224 /* Save the state of the current name resolution context. */
225 ctx_state.save_state(context, table_list);
228 Perform name resolution only in the first table - 'table_list',
229 which is the table that is inserted into.
231 table_list->next_local= 0;
232 context->resolve_in_table_list_only(table_list);
233 res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
235 /* Restore the current context. */
236 ctx_state.restore_state(context, table_list);
237 thd->lex->select_lex.no_wrap_view_item= FALSE;
239 if (res)
240 return -1;
242 if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
244 if (check_view_single_update(fields, table_list, map))
245 return -1;
246 table= table_list->table;
249 if (check_unique && thd->dup_field)
251 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
252 return -1;
254 if (table->timestamp_field) // Don't automaticly set timestamp if used
256 if (bitmap_is_set(table->write_set,
257 table->timestamp_field->field_index))
258 clear_timestamp_auto_bits(table->timestamp_field_type,
259 TIMESTAMP_AUTO_SET_ON_INSERT);
260 else
262 bitmap_set_bit(table->write_set,
263 table->timestamp_field->field_index);
267 // For the values we need select_priv
268 #ifndef NO_EMBEDDED_ACCESS_CHECKS
269 table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
270 #endif
272 if (check_key_in_view(thd, table_list) ||
273 (table_list->view &&
274 check_view_insertability(thd, table_list)))
276 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
277 return -1;
280 return 0;
285 Check update fields for the timestamp field.
287 SYNOPSIS
288 check_update_fields()
289 thd The current thread.
290 insert_table_list The insert table list.
291 table The table for update.
292 update_fields The update fields.
294 NOTE
295 If the update fields include the timestamp field,
296 remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.
298 RETURN
299 0 OK
300 -1 Error
303 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
304 List<Item> &update_fields, table_map *map)
306 TABLE *table= insert_table_list->table;
307 my_bool timestamp_mark= 0;
309 if (table->timestamp_field)
312 Unmark the timestamp field so that we can check if this is modified
313 by update_fields
315 timestamp_mark= bitmap_test_and_clear(table->write_set,
316 table->timestamp_field->field_index);
319 /* Check the fields we are going to modify */
320 if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
321 return -1;
323 if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
324 check_view_single_update(update_fields, insert_table_list, map))
325 return -1;
327 if (table->timestamp_field)
329 /* Don't set timestamp column if this is modified. */
330 if (bitmap_is_set(table->write_set,
331 table->timestamp_field->field_index))
332 clear_timestamp_auto_bits(table->timestamp_field_type,
333 TIMESTAMP_AUTO_SET_ON_UPDATE);
334 if (timestamp_mark)
335 bitmap_set_bit(table->write_set,
336 table->timestamp_field->field_index);
338 return 0;
342 Prepare triggers for INSERT-like statement.
344 SYNOPSIS
345 prepare_triggers_for_insert_stmt()
346 table Table to which insert will happen
348 NOTE
349 Prepare triggers for INSERT-like statement by marking fields
350 used by triggers and inform handlers that batching of UPDATE/DELETE
351 cannot be done if there are BEFORE UPDATE/DELETE triggers.
354 void prepare_triggers_for_insert_stmt(TABLE *table)
356 if (table->triggers)
358 if (table->triggers->has_triggers(TRG_EVENT_DELETE,
359 TRG_ACTION_AFTER))
362 The table has AFTER DELETE triggers that might access to
363 subject table and therefore might need delete to be done
364 immediately. So we turn-off the batching.
366 (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
368 if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
369 TRG_ACTION_AFTER))
372 The table has AFTER UPDATE triggers that might access to subject
373 table and therefore might need update to be done immediately.
374 So we turn-off the batching.
376 (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
379 table->mark_columns_needed_for_insert();
384 Upgrade table-level lock of INSERT statement to TL_WRITE if
385 a more concurrent lock is infeasible for some reason. This is
386 necessary for engines without internal locking support (MyISAM).
387 An engine with internal locking implementation might later
388 downgrade the lock in handler::store_lock() method.
391 static
392 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
393 enum_duplicates duplic,
394 bool is_multi_insert)
396 if (duplic == DUP_UPDATE ||
397 (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
399 *lock_type= TL_WRITE_DEFAULT;
400 return;
403 if (*lock_type == TL_WRITE_DELAYED)
406 We do not use delayed threads if:
407 - we're running in the safe mode or skip-new mode -- the
408 feature is disabled in these modes
409 - we're executing this statement on a replication slave --
410 we need to ensure serial execution of queries on the
411 slave
412 - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
413 insert cannot be concurrent
414 - this statement is directly or indirectly invoked from
415 a stored function or trigger (under pre-locking) - to
416 avoid deadlocks, since INSERT DELAYED involves a lock
417 upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
418 attempt while keeping other table level locks.
419 - this statement itself may require pre-locking.
420 We should upgrade the lock even though in most cases
421 delayed functionality may work. Unfortunately, we can't
422 easily identify whether the subject table is not used in
423 the statement indirectly via a stored function or trigger:
424 if it is used, that will lead to a deadlock between the
425 client connection and the delayed thread.
427 if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
428 thd->variables.max_insert_delayed_threads == 0 ||
429 thd->prelocked_mode ||
430 thd->lex->uses_stored_routines())
432 *lock_type= TL_WRITE;
433 return;
435 if (thd->slave_thread)
437 /* Try concurrent insert */
438 *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
439 TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
440 return;
443 bool log_on= (thd->options & OPTION_BIN_LOG ||
444 ! (thd->security_ctx->master_access & SUPER_ACL));
445 if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
446 log_on && mysql_bin_log.is_open() && is_multi_insert)
449 Statement-based binary logging does not work in this case, because:
450 a) two concurrent statements may have their rows intermixed in the
451 queue, leading to autoincrement replication problems on slave (because
452 the values generated used for one statement don't depend only on the
453 value generated for the first row of this statement, so are not
454 replicable)
455 b) if first row of the statement has an error the full statement is
456 not binlogged, while next rows of the statement may be inserted.
457 c) if first row succeeds, statement is binlogged immediately with a
458 zero error code (i.e. "no error"), if then second row fails, query
459 will fail on slave too and slave will stop (wrongly believing that the
460 master got no error).
461 So we fallback to non-delayed INSERT.
462 Note that to be fully correct, we should test the "binlog format which
463 the delayed thread is going to use for this row". But in the common case
464 where the global binlog format is not changed and the session binlog
465 format may be changed, that is equal to the global binlog format.
466 We test it without mutex for speed reasons (condition rarely true), and
467 in the common case (global not changed) it is as good as without mutex;
468 if global value is changed, anyway there is uncertainty as the delayed
469 thread may be old and use the before-the-change value.
471 *lock_type= TL_WRITE;
478 Find or create a delayed insert thread for the first table in
479 the table list, then open and lock the remaining tables.
480 If a table can not be used with insert delayed, upgrade the lock
481 and open and lock all tables using the standard mechanism.
483 @param thd thread context
484 @param table_list list of "descriptors" for tables referenced
485 directly in statement SQL text.
486 The first element in the list corresponds to
487 the destination table for inserts, remaining
488 tables, if any, are usually tables referenced
489 by sub-queries in the right part of the
490 INSERT.
492 @return Status of the operation. In case of success 'table'
493 member of every table_list element points to an instance of
494 class TABLE.
496 @sa open_and_lock_tables for more information about MySQL table
497 level locking
500 static
501 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
503 DBUG_ENTER("open_and_lock_for_insert_delayed");
505 #ifndef EMBEDDED_LIBRARY
506 if (thd->locked_tables && thd->global_read_lock)
509 If this connection has the global read lock, the handler thread
510 will not be able to lock the table. It will wait for the global
511 read lock to go away, but this will never happen since the
512 connection thread will be stuck waiting for the handler thread
513 to open and lock the table.
514 If we are not in locked tables mode, INSERT will seek protection
515 against the global read lock (and fail), thus we will only get
516 to this point in locked tables mode.
518 my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
519 DBUG_RETURN(TRUE);
522 if (delayed_get_table(thd, table_list))
523 DBUG_RETURN(TRUE);
525 if (table_list->table)
528 Open tables used for sub-selects or in stored functions, will also
529 cache these functions.
531 if (open_and_lock_tables(thd, table_list->next_global))
533 end_delayed_insert(thd);
534 DBUG_RETURN(TRUE);
537 First table was not processed by open_and_lock_tables(),
538 we need to set updatability flag "by hand".
540 if (!table_list->derived && !table_list->view)
541 table_list->updatable= 1; // usual table
542 DBUG_RETURN(FALSE);
544 #endif
546 * This is embedded library and we don't have auxiliary
547 threads OR
548 * a lock upgrade was requested inside delayed_get_table
549 because
550 - there are too many delayed insert threads OR
551 - the table has triggers.
552 Use a normal insert.
554 table_list->lock_type= TL_WRITE;
555 DBUG_RETURN(open_and_lock_tables(thd, table_list));
560 INSERT statement implementation
562 @note Like implementations of other DDL/DML in MySQL, this function
563 relies on the caller to close the thread tables. This is done in the
564 end of dispatch_command().
567 bool mysql_insert(THD *thd,TABLE_LIST *table_list,
568 List<Item> &fields,
569 List<List_item> &values_list,
570 List<Item> &update_fields,
571 List<Item> &update_values,
572 enum_duplicates duplic,
573 bool ignore)
575 int error, res;
576 bool transactional_table, joins_freed= FALSE;
577 bool changed;
578 bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
579 uint value_count;
580 ulong counter = 1;
581 ulonglong id;
582 COPY_INFO info;
583 TABLE *table= 0;
584 List_iterator_fast<List_item> its(values_list);
585 List_item *values;
586 Name_resolution_context *context;
587 Name_resolution_context_state ctx_state;
588 #ifndef EMBEDDED_LIBRARY
589 char *query= thd->query();
591 log_on is about delayed inserts only.
592 By default, both logs are enabled (this won't cause problems if the server
593 runs without --log-update or --log-bin).
595 bool log_on= ((thd->options & OPTION_BIN_LOG) ||
596 (!(thd->security_ctx->master_access & SUPER_ACL)));
597 #endif
598 thr_lock_type lock_type;
599 Item *unused_conds= 0;
600 DBUG_ENTER("mysql_insert");
603 Upgrade lock type if the requested lock is incompatible with
604 the current connection mode or table operation.
606 upgrade_lock_type(thd, &table_list->lock_type, duplic,
607 values_list.elements > 1);
610 We can't write-delayed into a table locked with LOCK TABLES:
611 this will lead to a deadlock, since the delayed thread will
612 never be able to get a lock on the table. QQQ: why not
613 upgrade the lock here instead?
615 if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
616 find_locked_table(thd, table_list->db, table_list->table_name))
618 my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
619 table_list->table_name);
620 DBUG_RETURN(TRUE);
623 if (table_list->lock_type == TL_WRITE_DELAYED)
625 if (open_and_lock_for_insert_delayed(thd, table_list))
626 DBUG_RETURN(TRUE);
628 else
630 if (open_and_lock_tables(thd, table_list))
631 DBUG_RETURN(TRUE);
633 lock_type= table_list->lock_type;
635 thd_proc_info(thd, "init");
636 thd->lex->used_tables=0;
637 values= its++;
638 value_count= values->elements;
640 if (mysql_prepare_insert(thd, table_list, table, fields, values,
641 update_fields, update_values, duplic, &unused_conds,
642 FALSE,
643 (fields.elements || !value_count ||
644 table_list->view != 0),
645 !ignore && (thd->variables.sql_mode &
646 (MODE_STRICT_TRANS_TABLES |
647 MODE_STRICT_ALL_TABLES))))
648 goto abort;
650 /* mysql_prepare_insert set table_list->table if it was not set */
651 table= table_list->table;
653 context= &thd->lex->select_lex.context;
655 These three asserts test the hypothesis that the resetting of the name
656 resolution context below is not necessary at all since the list of local
657 tables for INSERT always consists of one table.
659 DBUG_ASSERT(!table_list->next_local);
660 DBUG_ASSERT(!context->table_list->next_local);
661 DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
663 /* Save the state of the current name resolution context. */
664 ctx_state.save_state(context, table_list);
667 Perform name resolution only in the first table - 'table_list',
668 which is the table that is inserted into.
670 table_list->next_local= 0;
671 context->resolve_in_table_list_only(table_list);
673 while ((values= its++))
675 counter++;
676 if (values->elements != value_count)
678 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
679 goto abort;
681 if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
682 goto abort;
684 its.rewind ();
686 /* Restore the current context. */
687 ctx_state.restore_state(context, table_list);
690 Fill in the given fields and dump it to the table file
692 bzero((char*) &info,sizeof(info));
693 info.ignore= ignore;
694 info.handle_duplicates=duplic;
695 info.update_fields= &update_fields;
696 info.update_values= &update_values;
697 info.view= (table_list->view ? table_list : 0);
700 Count warnings for all inserts.
701 For single line insert, generate an error if try to set a NOT NULL field
702 to NULL.
704 thd->count_cuted_fields= ((values_list.elements == 1 &&
705 !ignore) ?
706 CHECK_FIELD_ERROR_FOR_NULL :
707 CHECK_FIELD_WARN);
708 thd->cuted_fields = 0L;
709 table->next_number_field=table->found_next_number_field;
711 #ifdef HAVE_REPLICATION
712 if (thd->slave_thread &&
713 (info.handle_duplicates == DUP_UPDATE) &&
714 (table->next_number_field != NULL) &&
715 rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
716 goto abort;
717 #endif
719 error=0;
720 thd_proc_info(thd, "update");
721 if (duplic == DUP_REPLACE &&
722 (!table->triggers || !table->triggers->has_delete_triggers()))
723 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
724 if (duplic == DUP_UPDATE)
725 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
727 let's *try* to start bulk inserts. It won't necessary
728 start them as values_list.elements should be greater than
729 some - handler dependent - threshold.
730 We should not start bulk inserts if this statement uses
731 functions or invokes triggers since they may access
732 to the same table and therefore should not see its
733 inconsistent state created by this optimization.
734 So we call start_bulk_insert to perform nesessary checks on
735 values_list.elements, and - if nothing else - to initialize
736 the code to make the call of end_bulk_insert() below safe.
738 #ifndef EMBEDDED_LIBRARY
739 if (lock_type != TL_WRITE_DELAYED)
740 #endif /* EMBEDDED_LIBRARY */
742 if (duplic != DUP_ERROR || ignore)
743 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
744 if (!thd->prelocked_mode)
745 table->file->ha_start_bulk_insert(values_list.elements);
748 thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
749 (MODE_STRICT_TRANS_TABLES |
750 MODE_STRICT_ALL_TABLES)));
752 prepare_triggers_for_insert_stmt(table);
755 if (table_list->prepare_where(thd, 0, TRUE) ||
756 table_list->prepare_check_option(thd))
757 error= 1;
759 while ((values= its++))
761 if (fields.elements || !value_count)
763 restore_record(table,s->default_values); // Get empty record
764 if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
765 table->triggers,
766 TRG_EVENT_INSERT))
768 if (values_list.elements != 1 && ! thd->is_error())
770 info.records++;
771 continue;
774 TODO: set thd->abort_on_warning if values_list.elements == 1
775 and check that all items return warning in case of problem with
776 storing field.
778 error=1;
779 break;
782 else
784 if (thd->lex->used_tables) // Column used in values()
785 restore_record(table,s->default_values); // Get empty record
786 else
788 TABLE_SHARE *share= table->s;
791 Fix delete marker. No need to restore rest of record since it will
792 be overwritten by fill_record() anyway (and fill_record() does not
793 use default values in this case).
795 table->record[0][0]= share->default_values[0];
797 /* Fix undefined null_bits. */
798 if (share->null_bytes > 1 && share->last_null_bit_pos)
800 table->record[0][share->null_bytes - 1]=
801 share->default_values[share->null_bytes - 1];
804 if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
805 table->triggers,
806 TRG_EVENT_INSERT))
808 if (values_list.elements != 1 && ! thd->is_error())
810 info.records++;
811 continue;
813 error=1;
814 break;
818 if ((res= table_list->view_check_option(thd,
819 (values_list.elements == 1 ?
821 ignore))) ==
822 VIEW_CHECK_SKIP)
823 continue;
824 else if (res == VIEW_CHECK_ERROR)
826 error= 1;
827 break;
829 #ifndef EMBEDDED_LIBRARY
830 if (lock_type == TL_WRITE_DELAYED)
832 LEX_STRING const st_query = { query, thd->query_length() };
833 error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
834 query=0;
836 else
837 #endif
838 error=write_record(thd, table ,&info);
839 if (error)
840 break;
841 thd->row_count++;
844 free_underlaid_joins(thd, &thd->lex->select_lex);
845 joins_freed= TRUE;
848 Now all rows are inserted. Time to update logs and sends response to
849 user
851 #ifndef EMBEDDED_LIBRARY
852 if (lock_type == TL_WRITE_DELAYED)
854 if (!error)
856 info.copied=values_list.elements;
857 end_delayed_insert(thd);
860 else
861 #endif
864 Do not do this release if this is a delayed insert, it would steal
865 auto_inc values from the delayed_insert thread as they share TABLE.
867 table->file->ha_release_auto_increment();
868 if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
870 table->file->print_error(my_errno,MYF(0));
871 error=1;
873 if (duplic != DUP_ERROR || ignore)
874 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
876 transactional_table= table->file->has_transactions();
878 if ((changed= (info.copied || info.deleted || info.updated)))
881 Invalidate the table in the query cache if something changed.
882 For the transactional algorithm to work the invalidation must be
883 before binlog writing and ha_autocommit_or_rollback
885 query_cache_invalidate3(thd, table_list, 1);
887 if (error <= 0 ||
888 thd->transaction.stmt.modified_non_trans_table ||
889 was_insert_delayed)
891 if (mysql_bin_log.is_open())
893 int errcode= 0;
894 if (error <= 0)
897 [Guilhem wrote] Temporary errors may have filled
898 thd->net.last_error/errno. For example if there has
899 been a disk full error when writing the row, and it was
900 MyISAM, then thd->net.last_error/errno will be set to
901 "disk full"... and the my_pwrite() will wait until free
902 space appears, and so when it finishes then the
903 write_row() was entirely successful
905 /* todo: consider removing */
906 thd->clear_error();
908 else
909 errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
911 /* bug#22725:
913 A query which per-row-loop can not be interrupted with
914 KILLED, like INSERT, and that does not invoke stored
915 routines can be binlogged with neglecting the KILLED error.
917 If there was no error (error == zero) until after the end of
918 inserting loop the KILLED flag that appeared later can be
919 disregarded since previously possible invocation of stored
920 routines did not result in any error due to the KILLED. In
921 such case the flag is ignored for constructing binlog event.
923 DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
924 if (thd->binlog_query(THD::ROW_QUERY_TYPE,
925 thd->query(), thd->query_length(),
926 transactional_table, FALSE,
927 errcode))
929 error=1;
932 if (thd->transaction.stmt.modified_non_trans_table)
933 thd->transaction.all.modified_non_trans_table= TRUE;
935 DBUG_ASSERT(transactional_table || !changed ||
936 thd->transaction.stmt.modified_non_trans_table);
938 thd_proc_info(thd, "end");
940 We'll report to the client this id:
941 - if the table contains an autoincrement column and we successfully
942 inserted an autogenerated value, the autogenerated value.
943 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
944 called, X.
945 - if the table contains an autoincrement column, and some rows were
946 inserted, the id of the last "inserted" row (if IGNORE, that value may not
947 have been really inserted but ignored).
949 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
950 thd->first_successful_insert_id_in_cur_stmt :
951 (thd->arg_of_last_insert_id_function ?
952 thd->first_successful_insert_id_in_prev_stmt :
953 ((table->next_number_field && info.copied) ?
954 table->next_number_field->val_int() : 0));
955 table->next_number_field=0;
956 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
957 table->auto_increment_field_not_null= FALSE;
958 if (duplic == DUP_REPLACE &&
959 (!table->triggers || !table->triggers->has_delete_triggers()))
960 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
962 if (error)
963 goto abort;
964 if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
965 !thd->cuted_fields))
967 thd->row_count_func= info.copied + info.deleted +
968 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
969 info.touched : info.updated);
970 my_ok(thd, (ulong) thd->row_count_func, id);
972 else
974 char buff[160];
975 ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
976 info.touched : info.updated);
977 if (ignore)
978 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
979 (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
980 (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
981 else
982 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
983 (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
984 thd->row_count_func= info.copied + info.deleted + updated;
985 ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
987 thd->abort_on_warning= 0;
988 DBUG_RETURN(FALSE);
990 abort:
991 #ifndef EMBEDDED_LIBRARY
992 if (lock_type == TL_WRITE_DELAYED)
993 end_delayed_insert(thd);
994 #endif
995 if (table != NULL)
996 table->file->ha_release_auto_increment();
997 if (!joins_freed)
998 free_underlaid_joins(thd, &thd->lex->select_lex);
999 thd->abort_on_warning= 0;
1000 DBUG_RETURN(TRUE);
1005 Additional check for insertability for VIEW
1007 SYNOPSIS
1008 check_view_insertability()
1009 thd - thread handler
1010 view - reference on VIEW
1012 IMPLEMENTATION
1013 A view is insertable if the folloings are true:
1014 - All columns in the view are columns from a table
1015 - All not used columns in table have a default values
1016 - All field in view are unique (not referring to the same column)
1018 RETURN
1019 FALSE - OK
1020 view->contain_auto_increment is 1 if and only if the view contains an
1021 auto_increment field
1023 TRUE - can't be used for insert
1026 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1028 uint num= view->view->select_lex.item_list.elements;
1029 TABLE *table= view->table;
1030 Field_translator *trans_start= view->field_translation,
1031 *trans_end= trans_start + num;
1032 Field_translator *trans;
1033 uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1034 uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1035 MY_BITMAP used_fields;
1036 enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
1037 DBUG_ENTER("check_key_in_view");
1039 if (!used_fields_buff)
1040 DBUG_RETURN(TRUE); // EOM
1042 DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1044 VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
1045 bitmap_clear_all(&used_fields);
1047 view->contain_auto_increment= 0;
1049 we must not set query_id for fields as they're not
1050 really used in this context
1052 thd->mark_used_columns= MARK_COLUMNS_NONE;
1053 /* check simplicity and prepare unique test of view */
1054 for (trans= trans_start; trans != trans_end; trans++)
1056 if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1058 thd->mark_used_columns= save_mark_used_columns;
1059 DBUG_RETURN(TRUE);
1061 Item_field *field;
1062 /* simple SELECT list entry (field without expression) */
1063 if (!(field= trans->item->filed_for_view_update()))
1065 thd->mark_used_columns= save_mark_used_columns;
1066 DBUG_RETURN(TRUE);
1068 if (field->field->unireg_check == Field::NEXT_NUMBER)
1069 view->contain_auto_increment= 1;
1070 /* prepare unique test */
1072 remove collation (or other transparent for update function) if we have
1075 trans->item= field;
1077 thd->mark_used_columns= save_mark_used_columns;
1078 /* unique test */
1079 for (trans= trans_start; trans != trans_end; trans++)
1081 /* Thanks to test above, we know that all columns are of type Item_field */
1082 Item_field *field= (Item_field *)trans->item;
1083 /* check fields belong to table in which we are inserting */
1084 if (field->field->table == table &&
1085 bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1086 DBUG_RETURN(TRUE);
1089 DBUG_RETURN(FALSE);
1094 Check if table can be updated
1096 SYNOPSIS
1097 mysql_prepare_insert_check_table()
1098 thd Thread handle
1099 table_list Table list
1100 fields List of fields to be updated
1101 where Pointer to where clause
1102 select_insert Check is making for SELECT ... INSERT
1104 RETURN
1105 FALSE ok
1106 TRUE ERROR
1109 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1110 List<Item> &fields,
1111 bool select_insert)
1113 bool insert_into_view= (table_list->view != 0);
1114 DBUG_ENTER("mysql_prepare_insert_check_table");
1117 first table in list is the one we'll INSERT into, requires INSERT_ACL.
1118 all others require SELECT_ACL only. the ACL requirement below is for
1119 new leaves only anyway (view-constituents), so check for SELECT rather
1120 than INSERT.
1123 if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
1124 &thd->lex->select_lex.top_join_list,
1125 table_list,
1126 &thd->lex->select_lex.leaf_tables,
1127 select_insert, INSERT_ACL, SELECT_ACL))
1128 DBUG_RETURN(TRUE);
1130 if (insert_into_view && !fields.elements)
1132 thd->lex->empty_field_list_on_rset= 1;
1133 if (!table_list->table)
1135 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1136 table_list->view_db.str, table_list->view_name.str);
1137 DBUG_RETURN(TRUE);
1139 DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1142 DBUG_RETURN(FALSE);
1147 Get extra info for tables we insert into
1149 @param table table(TABLE object) we insert into,
1150 might be NULL in case of view
1151 @param table(TABLE_LIST object) or view we insert into
1154 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1156 if (table)
1158 if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1159 table->prepare_for_position();
1160 return;
1163 DBUG_ASSERT(tables->view);
1164 List_iterator<TABLE_LIST> it(*tables->view_tables);
1165 TABLE_LIST *tbl;
1166 while ((tbl= it++))
1167 prepare_for_positional_update(tbl->table, tbl);
1169 return;
1174 Prepare items in INSERT statement
1176 SYNOPSIS
1177 mysql_prepare_insert()
1178 thd Thread handler
1179 table_list Global/local table list
1180 table Table to insert into (can be NULL if table should
1181 be taken from table_list->table)
1182 where Where clause (for insert ... select)
1183 select_insert TRUE if INSERT ... SELECT statement
1184 check_fields TRUE if need to check that all INSERT fields are
1185 given values.
1186 abort_on_warning whether to report if some INSERT field is not
1187 assigned as an error (TRUE) or as a warning (FALSE).
1189 TODO (in far future)
1190 In cases of:
1191 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1192 ON DUPLICATE KEY ...
1193 we should be able to refer to sum1 in the ON DUPLICATE KEY part
1195 WARNING
1196 You MUST set table->insert_values to 0 after calling this function
1197 before releasing the table object.
1199 RETURN VALUE
1200 FALSE OK
1201 TRUE error
1204 bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1205 TABLE *table, List<Item> &fields, List_item *values,
1206 List<Item> &update_fields, List<Item> &update_values,
1207 enum_duplicates duplic,
1208 COND **where, bool select_insert,
1209 bool check_fields, bool abort_on_warning)
1211 SELECT_LEX *select_lex= &thd->lex->select_lex;
1212 Name_resolution_context *context= &select_lex->context;
1213 Name_resolution_context_state ctx_state;
1214 bool insert_into_view= (table_list->view != 0);
1215 bool res= 0;
1216 table_map map= 0;
1217 DBUG_ENTER("mysql_prepare_insert");
1218 DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
1219 (ulong)table_list, (ulong)table,
1220 (int)insert_into_view));
1221 /* INSERT should have a SELECT or VALUES clause */
1222 DBUG_ASSERT (!select_insert || !values);
1225 For subqueries in VALUES() we should not see the table in which we are
1226 inserting (for INSERT ... SELECT this is done by changing table_list,
1227 because INSERT ... SELECT share SELECT_LEX it with SELECT.
1229 if (!select_insert)
1231 for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1233 un= un->next_unit())
1235 for (SELECT_LEX *sl= un->first_select();
1237 sl= sl->next_select())
1239 sl->context.outer_context= 0;
1244 if (duplic == DUP_UPDATE)
1246 /* it should be allocated before Item::fix_fields() */
1247 if (table_list->set_insert_values(thd->mem_root))
1248 DBUG_RETURN(TRUE);
1251 if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1252 DBUG_RETURN(TRUE);
1255 /* Prepare the fields in the statement. */
1256 if (values)
1258 /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1259 DBUG_ASSERT (!select_lex->group_list.elements);
1261 /* Save the state of the current name resolution context. */
1262 ctx_state.save_state(context, table_list);
1265 Perform name resolution only in the first table - 'table_list',
1266 which is the table that is inserted into.
1268 table_list->next_local= 0;
1269 context->resolve_in_table_list_only(table_list);
1271 res= check_insert_fields(thd, context->table_list, fields, *values,
1272 !insert_into_view, &map) ||
1273 setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
1275 if (!res && check_fields)
1277 bool saved_abort_on_warning= thd->abort_on_warning;
1278 thd->abort_on_warning= abort_on_warning;
1279 res= check_that_all_fields_are_given_values(thd,
1280 table ? table :
1281 context->table_list->table,
1282 context->table_list);
1283 thd->abort_on_warning= saved_abort_on_warning;
1286 if (!res && duplic == DUP_UPDATE)
1288 select_lex->no_wrap_view_item= TRUE;
1289 res= check_update_fields(thd, context->table_list, update_fields, &map);
1290 select_lex->no_wrap_view_item= FALSE;
1293 /* Restore the current context. */
1294 ctx_state.restore_state(context, table_list);
1296 if (!res)
1297 res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
1300 if (res)
1301 DBUG_RETURN(res);
1303 if (!table)
1304 table= table_list->table;
1306 if (!select_insert)
1308 Item *fake_conds= 0;
1309 TABLE_LIST *duplicate;
1310 if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1312 update_non_unique_table_error(table_list, "INSERT", duplicate);
1313 DBUG_RETURN(TRUE);
1315 select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1316 select_lex->first_execution= 0;
1319 Only call prepare_for_posistion() if we are not performing a DELAYED
1320 operation. It will instead be executed by delayed insert thread.
1322 if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1323 prepare_for_positional_update(table, table_list);
1324 DBUG_RETURN(FALSE);
1328 /* Check if there is more uniq keys after field */
1330 static int last_uniq_key(TABLE *table,uint keynr)
1332 while (++keynr < table->s->keys)
1333 if (table->key_info[keynr].flags & HA_NOSAME)
1334 return 0;
1335 return 1;
1340 Write a record to table with optional deleting of conflicting records,
1341 invoke proper triggers if needed.
1343 SYNOPSIS
1344 write_record()
1345 thd - thread context
1346 table - table to which record should be written
1347 info - COPY_INFO structure describing handling of duplicates
1348 and which is used for counting number of records inserted
1349 and deleted.
1351 NOTE
1352 Once this record will be written to table after insert trigger will
1353 be invoked. If instead of inserting new record we will update old one
1354 then both on update triggers will work instead. Similarly both on
1355 delete triggers will be invoked if we will delete conflicting records.
1357 Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
1358 transactions.
1360 RETURN VALUE
1361 0 - success
1362 non-0 - error
1366 int write_record(THD *thd, TABLE *table,COPY_INFO *info)
1368 int error, trg_error= 0;
1369 char *key=0;
1370 MY_BITMAP *save_read_set, *save_write_set;
1371 ulonglong prev_insert_id= table->file->next_insert_id;
1372 ulonglong insert_id_for_cur_row= 0;
1373 DBUG_ENTER("write_record");
1375 info->records++;
1376 save_read_set= table->read_set;
1377 save_write_set= table->write_set;
1379 if (info->handle_duplicates == DUP_REPLACE ||
1380 info->handle_duplicates == DUP_UPDATE)
1382 while ((error=table->file->ha_write_row(table->record[0])))
1384 uint key_nr;
1386 If we do more than one iteration of this loop, from the second one the
1387 row will have an explicit value in the autoinc field, which was set at
1388 the first call of handler::update_auto_increment(). So we must save
1389 the autogenerated value to avoid thd->insert_id_for_cur_row to become
1392 if (table->file->insert_id_for_cur_row > 0)
1393 insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1394 else
1395 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1396 bool is_duplicate_key_error;
1397 if (table->file->is_fatal_error(error, HA_CHECK_DUP))
1398 goto err;
1399 is_duplicate_key_error= table->file->is_fatal_error(error, 0);
1400 if (!is_duplicate_key_error)
1403 We come here when we had an ignorable error which is not a duplicate
1404 key error. In this we ignore error if ignore flag is set, otherwise
1405 report error as usual. We will not do any duplicate key processing.
1407 if (info->ignore)
1408 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1409 goto err;
1411 if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
1413 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1414 goto err;
1416 DEBUG_SYNC(thd, "write_row_replace");
1418 /* Read all columns for the row we are going to replace */
1419 table->use_all_columns();
1421 Don't allow REPLACE to replace a row when a auto_increment column
1422 was used. This ensures that we don't get a problem when the
1423 whole range of the key has been used.
1425 if (info->handle_duplicates == DUP_REPLACE &&
1426 table->next_number_field &&
1427 key_nr == table->s->next_number_index &&
1428 (insert_id_for_cur_row > 0))
1429 goto err;
1430 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1432 if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
1433 goto err;
1435 else
1437 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1439 error=my_errno;
1440 goto err;
1443 if (!key)
1445 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
1446 MAX_KEY_LENGTH)))
1448 error=ENOMEM;
1449 goto err;
1452 key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1453 if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
1454 (uchar*) key, HA_WHOLE_KEY,
1455 HA_READ_KEY_EXACT))))
1456 goto err;
1458 if (info->handle_duplicates == DUP_UPDATE)
1460 int res= 0;
1462 We don't check for other UNIQUE keys - the first row
1463 that matches, is updated. If update causes a conflict again,
1464 an error is returned
1466 DBUG_ASSERT(table->insert_values != NULL);
1467 store_record(table,insert_values);
1468 restore_record(table,record[1]);
1469 DBUG_ASSERT(info->update_fields->elements ==
1470 info->update_values->elements);
1471 if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
1472 *info->update_values,
1473 info->ignore,
1474 table->triggers,
1475 TRG_EVENT_UPDATE))
1476 goto before_trg_err;
1478 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1479 if (info->view &&
1480 (res= info->view->view_check_option(current_thd, info->ignore)) ==
1481 VIEW_CHECK_SKIP)
1482 goto ok_or_after_trg_err;
1483 if (res == VIEW_CHECK_ERROR)
1484 goto before_trg_err;
1486 table->file->restore_auto_increment(prev_insert_id);
1487 if (table->next_number_field)
1488 table->file->adjust_next_insert_id_after_explicit_value(
1489 table->next_number_field->val_int());
1490 info->touched++;
1491 if (!records_are_comparable(table) || compare_records(table))
1493 if ((error=table->file->ha_update_row(table->record[1],
1494 table->record[0])) &&
1495 error != HA_ERR_RECORD_IS_THE_SAME)
1497 if (info->ignore &&
1498 !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1500 goto ok_or_after_trg_err;
1502 goto err;
1505 if (error != HA_ERR_RECORD_IS_THE_SAME)
1506 info->updated++;
1507 else
1508 error= 0;
1510 If ON DUP KEY UPDATE updates a row instead of inserting one, it's
1511 like a regular UPDATE statement: it should not affect the value of a
1512 next SELECT LAST_INSERT_ID() or mysql_insert_id().
1513 Except if LAST_INSERT_ID(#) was in the INSERT query, which is
1514 handled separately by THD::arg_of_last_insert_id_function.
1516 insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1517 trg_error= (table->triggers &&
1518 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1519 TRG_ACTION_AFTER, TRUE));
1520 info->copied++;
1523 if (table->next_number_field)
1524 table->file->adjust_next_insert_id_after_explicit_value(
1525 table->next_number_field->val_int());
1526 info->touched++;
1528 goto ok_or_after_trg_err;
1530 else /* DUP_REPLACE */
1533 The manual defines the REPLACE semantics that it is either
1534 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1535 InnoDB do not function in the defined way if we allow MySQL
1536 to convert the latter operation internally to an UPDATE.
1537 We also should not perform this conversion if we have
1538 timestamp field with ON UPDATE which is different from DEFAULT.
1539 Another case when conversion should not be performed is when
1540 we have ON DELETE trigger on table so user may notice that
1541 we cheat here. Note that it is ok to do such conversion for
1542 tables which have ON UPDATE but have no ON DELETE triggers,
1543 we just should not expose this fact to users by invoking
1544 ON UPDATE triggers.
1546 if (last_uniq_key(table,key_nr) &&
1547 !table->file->referenced_by_foreign_key() &&
1548 (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1549 table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
1550 (!table->triggers || !table->triggers->has_delete_triggers()))
1552 if ((error=table->file->ha_update_row(table->record[1],
1553 table->record[0])) &&
1554 error != HA_ERR_RECORD_IS_THE_SAME)
1555 goto err;
1556 if (error != HA_ERR_RECORD_IS_THE_SAME)
1557 info->deleted++;
1558 else
1559 error= 0;
1560 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1562 Since we pretend that we have done insert we should call
1563 its after triggers.
1565 goto after_trg_n_copied_inc;
1567 else
1569 if (table->triggers &&
1570 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1571 TRG_ACTION_BEFORE, TRUE))
1572 goto before_trg_err;
1573 if ((error=table->file->ha_delete_row(table->record[1])))
1574 goto err;
1575 info->deleted++;
1576 if (!table->file->has_transactions())
1577 thd->transaction.stmt.modified_non_trans_table= TRUE;
1578 if (table->triggers &&
1579 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1580 TRG_ACTION_AFTER, TRUE))
1582 trg_error= 1;
1583 goto ok_or_after_trg_err;
1585 /* Let us attempt do write_row() once more */
1591 If more than one iteration of the above while loop is done, from the second
1592 one the row being inserted will have an explicit value in the autoinc field,
1593 which was set at the first call of handler::update_auto_increment(). This
1594 value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
1595 autoinc value.
1597 if (table->file->insert_id_for_cur_row == 0)
1598 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1600 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1602 Restore column maps if they where replaced during an duplicate key
1603 problem.
1605 if (table->read_set != save_read_set ||
1606 table->write_set != save_write_set)
1607 table->column_bitmaps_set(save_read_set, save_write_set);
1609 else if ((error=table->file->ha_write_row(table->record[0])))
1611 DEBUG_SYNC(thd, "write_row_noreplace");
1612 if (!info->ignore ||
1613 table->file->is_fatal_error(error, HA_CHECK_DUP))
1614 goto err;
1615 table->file->restore_auto_increment(prev_insert_id);
1616 goto ok_or_after_trg_err;
1619 after_trg_n_copied_inc:
1620 info->copied++;
1621 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1622 trg_error= (table->triggers &&
1623 table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
1624 TRG_ACTION_AFTER, TRUE));
1626 ok_or_after_trg_err:
1627 if (key)
1628 my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1629 if (!table->file->has_transactions())
1630 thd->transaction.stmt.modified_non_trans_table= TRUE;
1631 DBUG_RETURN(trg_error);
1633 err:
1634 info->last_errno= error;
1635 /* current_select is NULL if this is a delayed insert */
1636 if (thd->lex->current_select)
1637 thd->lex->current_select->no_error= 0; // Give error
1638 table->file->print_error(error,MYF(0));
1640 before_trg_err:
1641 table->file->restore_auto_increment(prev_insert_id);
1642 if (key)
1643 my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1644 table->column_bitmaps_set(save_read_set, save_write_set);
1645 DBUG_RETURN(1);
1649 /******************************************************************************
1650 Check that all fields with arn't null_fields are used
1651 ******************************************************************************/
1653 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
1654 TABLE_LIST *table_list)
1656 int err= 0;
1657 MY_BITMAP *write_set= entry->write_set;
1659 for (Field **field=entry->field ; *field ; field++)
1661 if (!bitmap_is_set(write_set, (*field)->field_index) &&
1662 ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1663 ((*field)->real_type() != MYSQL_TYPE_ENUM))
1665 bool view= FALSE;
1666 if (table_list)
1668 table_list= table_list->top_table();
1669 view= test(table_list->view);
1671 if (view)
1673 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1674 ER_NO_DEFAULT_FOR_VIEW_FIELD,
1675 ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
1676 table_list->view_db.str,
1677 table_list->view_name.str);
1679 else
1681 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1682 ER_NO_DEFAULT_FOR_FIELD,
1683 ER(ER_NO_DEFAULT_FOR_FIELD),
1684 (*field)->field_name);
1686 err= 1;
1689 return thd->abort_on_warning ? err : 0;
1692 /*****************************************************************************
1693 Handling of delayed inserts
1694 A thread is created for each table that one uses with the DELAYED attribute.
1695 *****************************************************************************/
1697 #ifndef EMBEDDED_LIBRARY
1699 class delayed_row :public ilink {
1700 public:
1701 char *record;
1702 enum_duplicates dup;
1703 time_t start_time;
1704 ulong sql_mode;
1705 bool auto_increment_field_not_null;
1706 bool query_start_used, ignore, log_query;
1707 bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
1708 ulonglong first_successful_insert_id_in_prev_stmt;
1709 ulonglong forced_insert_id;
1710 ulong auto_increment_increment;
1711 ulong auto_increment_offset;
1712 timestamp_auto_set_type timestamp_field_type;
1713 LEX_STRING query;
1714 Time_zone *time_zone;
1716 delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
1717 bool ignore_arg, bool log_query_arg)
1718 : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1719 forced_insert_id(0), query(query_arg), time_zone(0)
1721 ~delayed_row()
1723 x_free(query.str);
1724 x_free(record);
1729 Delayed_insert - context of a thread responsible for delayed insert
1730 into one table. When processing delayed inserts, we create an own
1731 thread for every distinct table. Later on all delayed inserts directed
1732 into that table are handled by a dedicated thread.
1735 class Delayed_insert :public ilink {
1736 uint locks_in_memory;
1737 thr_lock_type delayed_lock;
1738 public:
1739 THD thd;
1740 TABLE *table;
1741 pthread_mutex_t mutex;
1742 pthread_cond_t cond,cond_client;
1743 volatile uint tables_in_use,stacked_inserts;
1744 volatile bool status,dead;
1745 COPY_INFO info;
1746 I_List<delayed_row> rows;
1747 ulong group_count;
1748 TABLE_LIST table_list; // Argument
1750 Delayed_insert()
1751 :locks_in_memory(0),
1752 table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
1753 group_count(0)
1755 thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
1756 thd.security_ctx->host=(char*) my_localhost;
1757 thd.current_tablenr=0;
1758 thd.version=refresh_version;
1759 thd.command=COM_DELAYED_INSERT;
1760 thd.lex->current_select= 0; // for my_message_sql
1761 thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
1763 Statement-based replication of INSERT DELAYED has problems with RAND()
1764 and user vars, so in mixed mode we go to row-based.
1766 thd.lex->set_stmt_unsafe();
1767 thd.set_current_stmt_binlog_row_based_if_mixed();
1769 bzero((char*) &thd.net, sizeof(thd.net)); // Safety
1770 bzero((char*) &table_list, sizeof(table_list)); // Safety
1771 thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1772 thd.security_ctx->host_or_ip= "";
1773 bzero((char*) &info,sizeof(info));
1774 pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
1775 pthread_cond_init(&cond,NULL);
1776 pthread_cond_init(&cond_client,NULL);
1777 VOID(pthread_mutex_lock(&LOCK_thread_count));
1778 delayed_insert_threads++;
1779 delayed_lock= global_system_variables.low_priority_updates ?
1780 TL_WRITE_LOW_PRIORITY : TL_WRITE;
1781 VOID(pthread_mutex_unlock(&LOCK_thread_count));
1783 ~Delayed_insert()
1785 /* The following is not really needed, but just for safety */
1786 delayed_row *row;
1787 while ((row=rows.get()))
1788 delete row;
1789 if (table)
1790 close_thread_tables(&thd);
1791 VOID(pthread_mutex_lock(&LOCK_thread_count));
1792 pthread_mutex_destroy(&mutex);
1793 pthread_cond_destroy(&cond);
1794 pthread_cond_destroy(&cond_client);
1795 thd.unlink(); // Must be unlinked under lock
1796 x_free(thd.query());
1797 thd.security_ctx->user= thd.security_ctx->host=0;
1798 thread_count--;
1799 delayed_insert_threads--;
1800 VOID(pthread_mutex_unlock(&LOCK_thread_count));
1801 VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
1804 /* The following is for checking when we can delete ourselves */
1805 inline void lock()
1807 locks_in_memory++; // Assume LOCK_delay_insert
1809 void unlock()
1811 pthread_mutex_lock(&LOCK_delayed_insert);
1812 if (!--locks_in_memory)
1814 pthread_mutex_lock(&mutex);
1815 if (thd.killed && ! stacked_inserts && ! tables_in_use)
1817 pthread_cond_signal(&cond);
1818 status=1;
1820 pthread_mutex_unlock(&mutex);
1822 pthread_mutex_unlock(&LOCK_delayed_insert);
1824 inline uint lock_count() { return locks_in_memory; }
1826 TABLE* get_local_table(THD* client_thd);
1827 bool handle_inserts(void);
1831 I_List<Delayed_insert> delayed_threads;
1835 Return an instance of delayed insert thread that can handle
1836 inserts into a given table, if it exists. Otherwise return NULL.
1839 static
1840 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
1842 thd_proc_info(thd, "waiting for delay_list");
1843 pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list
1844 I_List_iterator<Delayed_insert> it(delayed_threads);
1845 Delayed_insert *di;
1846 while ((di= it++))
1848 if (!strcmp(table_list->db, di->table_list.db) &&
1849 !strcmp(table_list->table_name, di->table_list.table_name))
1851 di->lock();
1852 break;
1855 pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
1856 return di;
1861 Attempt to find or create a delayed insert thread to handle inserts
1862 into this table.
1864 @return In case of success, table_list->table points to a local copy
1865 of the delayed table or is set to NULL, which indicates a
1866 request for lock upgrade. In case of failure, value of
1867 table_list->table is undefined.
1868 @retval TRUE - this thread ran out of resources OR
1869 - a newly created delayed insert thread ran out of
1870 resources OR
1871 - the created thread failed to open and lock the table
1872 (e.g. because it does not exist) OR
1873 - the table opened in the created thread turned out to
1874 be a view
1875 @retval FALSE - table successfully opened OR
1876 - too many delayed insert threads OR
1877 - the table has triggers and we have to fall back to
1878 a normal INSERT
1879 Two latter cases indicate a request for lock upgrade.
1881 XXX: why do we regard INSERT DELAYED into a view as an error and
1882 do not simply perform a lock upgrade?
1884 TODO: The approach with using two mutexes to work with the
1885 delayed thread list -- LOCK_delayed_insert and
1886 LOCK_delayed_create -- is redundant, and we only need one of
1887 them to protect the list. The reason we have two locks is that
1888 we do not want to block look-ups in the list while we're waiting
1889 for the newly created thread to open the delayed table. However,
1890 this wait itself is redundant -- we always call get_local_table
1891 later on, and there wait again until the created thread acquires
1892 a table lock.
1894 As is redundant the concept of locks_in_memory, since we already
1895 have another counter with similar semantics - tables_in_use,
1896 both of them are devoted to counting the number of producers for
1897 a given consumer (delayed insert thread), only at different
1898 stages of producer-consumer relationship.
1900 'dead' and 'status' variables in Delayed_insert are redundant
1901 too, since there is already 'di->thd.killed' and
1902 di->stacked_inserts.
1905 static
1906 bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
1908 int error;
1909 Delayed_insert *di;
1910 DBUG_ENTER("delayed_get_table");
1912 /* Must be set in the parser */
1913 DBUG_ASSERT(table_list->db);
1915 /* Find the thread which handles this table. */
1916 if (!(di= find_handler(thd, table_list)))
1919 No match. Create a new thread to handle the table, but
1920 no more than max_insert_delayed_threads.
1922 if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1923 DBUG_RETURN(0);
1924 thd_proc_info(thd, "Creating delayed handler");
1925 pthread_mutex_lock(&LOCK_delayed_create);
1927 The first search above was done without LOCK_delayed_create.
1928 Another thread might have created the handler in between. Search again.
1930 if (! (di= find_handler(thd, table_list)))
1932 if (!(di= new Delayed_insert()))
1934 thd->fatal_error();
1935 goto end_create;
1937 pthread_mutex_lock(&LOCK_thread_count);
1938 thread_count++;
1939 pthread_mutex_unlock(&LOCK_thread_count);
1940 di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
1941 di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0);
1942 if (di->thd.db == NULL || di->thd.query() == NULL)
1944 /* The error is reported */
1945 delete di;
1946 thd->fatal_error();
1947 goto end_create;
1949 di->table_list= *table_list; // Needed to open table
1950 /* Replace volatile strings with local copies */
1951 di->table_list.alias= di->table_list.table_name= di->thd.query();
1952 di->table_list.db= di->thd.db;
1953 di->lock();
1954 pthread_mutex_lock(&di->mutex);
1955 if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
1956 handle_delayed_insert, (void*) di)))
1958 DBUG_PRINT("error",
1959 ("Can't create thread to handle delayed insert (error %d)",
1960 error));
1961 pthread_mutex_unlock(&di->mutex);
1962 di->unlock();
1963 delete di;
1964 my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1965 thd->fatal_error();
1966 goto end_create;
1969 /* Wait until table is open */
1970 thd_proc_info(thd, "waiting for handler open");
1971 while (!di->thd.killed && !di->table && !thd->killed)
1973 pthread_cond_wait(&di->cond_client, &di->mutex);
1975 pthread_mutex_unlock(&di->mutex);
1976 thd_proc_info(thd, "got old table");
1977 if (di->thd.killed)
1979 if (di->thd.is_error())
1982 Copy the error message. Note that we don't treat fatal
1983 errors in the delayed thread as fatal errors in the
1984 main thread. Use of my_message will enable stored
1985 procedures continue handlers.
1987 my_message(di->thd.main_da.sql_errno(), di->thd.main_da.message(),
1988 MYF(0));
1990 di->unlock();
1991 goto end_create;
1993 if (thd->killed)
1995 di->unlock();
1996 goto end_create;
1998 pthread_mutex_lock(&LOCK_delayed_insert);
1999 delayed_threads.append(di);
2000 pthread_mutex_unlock(&LOCK_delayed_insert);
2002 pthread_mutex_unlock(&LOCK_delayed_create);
2005 pthread_mutex_lock(&di->mutex);
2006 table_list->table= di->get_local_table(thd);
2007 pthread_mutex_unlock(&di->mutex);
2008 if (table_list->table)
2010 DBUG_ASSERT(! thd->is_error());
2011 thd->di= di;
2013 /* Unlock the delayed insert object after its last access. */
2014 di->unlock();
2015 DBUG_RETURN((table_list->table == NULL));
2017 end_create:
2018 pthread_mutex_unlock(&LOCK_delayed_create);
2019 DBUG_RETURN(thd->is_error());
2024 As we can't let many client threads modify the same TABLE
2025 structure of the dedicated delayed insert thread, we create an
2026 own structure for each client thread. This includes a row
2027 buffer to save the column values and new fields that point to
2028 the new row buffer. The memory is allocated in the client
2029 thread and is freed automatically.
2031 @pre This function is called from the client thread. Delayed
2032 insert thread mutex must be acquired before invoking this
2033 function.
2035 @return Not-NULL table object on success. NULL in case of an error,
2036 which is set in client_thd.
2039 TABLE *Delayed_insert::get_local_table(THD* client_thd)
2041 my_ptrdiff_t adjust_ptrs;
2042 Field **field,**org_field, *found_next_number_field;
2043 TABLE *copy;
2044 TABLE_SHARE *share;
2045 uchar *bitmap;
2046 DBUG_ENTER("Delayed_insert::get_local_table");
2048 /* First request insert thread to get a lock */
2049 status=1;
2050 tables_in_use++;
2051 if (!thd.lock) // Table is not locked
2053 thd_proc_info(client_thd, "waiting for handler lock");
2054 pthread_cond_signal(&cond); // Tell handler to lock table
2055 while (!dead && !thd.lock && ! client_thd->killed)
2057 pthread_cond_wait(&cond_client,&mutex);
2059 thd_proc_info(client_thd, "got handler lock");
2060 if (client_thd->killed)
2061 goto error;
2062 if (dead)
2064 my_message(thd.main_da.sql_errno(), thd.main_da.message(), MYF(0));
2065 goto error;
2068 share= table->s;
2071 Allocate memory for the TABLE object, the field pointers array, and
2072 one record buffer of reclength size. Normally a table has three
2073 record buffers of rec_buff_length size, which includes alignment
2074 bytes. Since the table copy is used for creating one record only,
2075 the other record buffers and alignment are unnecessary.
2077 thd_proc_info(client_thd, "allocating local table");
2078 copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
2079 (share->fields+1)*sizeof(Field**)+
2080 share->reclength +
2081 share->column_bitmap_size*2);
2082 if (!copy)
2083 goto error;
2085 /* Copy the TABLE object. */
2086 *copy= *table;
2087 /* We don't need to change the file handler here */
2088 /* Assign the pointers for the field pointers array and the record. */
2089 field= copy->field= (Field**) (copy + 1);
2090 bitmap= (uchar*) (field + share->fields + 1);
2091 copy->record[0]= (bitmap + share->column_bitmap_size * 2);
2092 memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2094 Make a copy of all fields.
2095 The copied fields need to point into the copied record. This is done
2096 by copying the field objects with their old pointer values and then
2097 "move" the pointers by the distance between the original and copied
2098 records. That way we preserve the relative positions in the records.
2100 adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2101 found_next_number_field= table->found_next_number_field;
2102 for (org_field= table->field; *org_field; org_field++, field++)
2104 if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
2105 goto error;
2106 (*field)->orig_table= copy; // Remove connection
2107 (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
2108 if (*org_field == found_next_number_field)
2109 (*field)->table->found_next_number_field= *field;
2111 *field=0;
2113 /* Adjust timestamp */
2114 if (table->timestamp_field)
2116 /* Restore offset as this may have been reset in handle_inserts */
2117 copy->timestamp_field=
2118 (Field_timestamp*) copy->field[share->timestamp_field_offset];
2119 copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
2120 copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
2123 /* Adjust in_use for pointing to client thread */
2124 copy->in_use= client_thd;
2126 /* Adjust lock_count. This table object is not part of a lock. */
2127 copy->lock_count= 0;
2129 /* Adjust bitmaps */
2130 copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
2131 copy->def_write_set.bitmap= ((my_bitmap_map*)
2132 (bitmap + share->column_bitmap_size));
2133 copy->tmp_set.bitmap= 0; // To catch errors
2134 bzero((char*) bitmap, share->column_bitmap_size*2);
2135 copy->read_set= &copy->def_read_set;
2136 copy->write_set= &copy->def_write_set;
2138 DBUG_RETURN(copy);
2140 /* Got fatal error */
2141 error:
2142 tables_in_use--;
2143 status=1;
2144 pthread_cond_signal(&cond); // Inform thread about abort
2145 DBUG_RETURN(0);
2149 /* Put a question in queue */
2151 static
2152 int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
2153 LEX_STRING query, bool ignore, bool log_on)
2155 delayed_row *row= 0;
2156 Delayed_insert *di=thd->di;
2157 const Discrete_interval *forced_auto_inc;
2158 DBUG_ENTER("write_delayed");
2159 DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2160 (ulong) query.length));
2162 thd_proc_info(thd, "waiting for handler insert");
2163 pthread_mutex_lock(&di->mutex);
2164 while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2165 pthread_cond_wait(&di->cond_client,&di->mutex);
2166 thd_proc_info(thd, "storing row into queue");
2168 if (thd->killed)
2169 goto err;
2172 Take a copy of the query string, if there is any. The string will
2173 be free'ed when the row is destroyed. If there is no query string,
2174 we don't do anything special.
2177 if (query.str)
2179 char *str;
2180 if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2181 goto err;
2182 query.str= str;
2184 row= new delayed_row(query, duplic, ignore, log_on);
2185 if (row == NULL)
2187 my_free(query.str, MYF(MY_WME));
2188 goto err;
2191 if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
2192 goto err;
2193 memcpy(row->record, table->record[0], table->s->reclength);
2194 row->start_time= thd->start_time;
2195 row->query_start_used= thd->query_start_used;
2197 those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2198 time, so record does not need it, but statement-based binlogging of the
2199 INSERT will need when the row is actually inserted.
2200 As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2202 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2203 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2204 row->first_successful_insert_id_in_prev_stmt=
2205 thd->first_successful_insert_id_in_prev_stmt;
2206 row->timestamp_field_type= table->timestamp_field_type;
2208 /* Add session variable timezone
2209 Time_zone object will not be freed even the thread is ended.
2210 So we can get time_zone object from thread which handling delayed statement.
2211 See the comment of my_tz_find() for detail.
2213 if (thd->time_zone_used)
2215 row->time_zone = thd->variables.time_zone;
2217 else
2219 row->time_zone = NULL;
2221 /* Copy session variables. */
2222 row->auto_increment_increment= thd->variables.auto_increment_increment;
2223 row->auto_increment_offset= thd->variables.auto_increment_offset;
2224 row->sql_mode= thd->variables.sql_mode;
2225 row->auto_increment_field_not_null= table->auto_increment_field_not_null;
2227 /* Copy the next forced auto increment value, if any. */
2228 if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
2230 row->forced_insert_id= forced_auto_inc->minimum();
2231 DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2232 (ulong) row->forced_insert_id));
2235 di->rows.push_back(row);
2236 di->stacked_inserts++;
2237 di->status=1;
2238 if (table->s->blob_fields)
2239 unlink_blobs(table);
2240 pthread_cond_signal(&di->cond);
2242 thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2243 pthread_mutex_unlock(&di->mutex);
2244 DBUG_RETURN(0);
2246 err:
2247 delete row;
2248 pthread_mutex_unlock(&di->mutex);
2249 DBUG_RETURN(1);
2253 Signal the delayed insert thread that this user connection
2254 is finished using it for this statement.
2257 static void end_delayed_insert(THD *thd)
2259 DBUG_ENTER("end_delayed_insert");
2260 Delayed_insert *di=thd->di;
2261 pthread_mutex_lock(&di->mutex);
2262 DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2263 if (!--di->tables_in_use || di->thd.killed)
2264 { // Unlock table
2265 di->status=1;
2266 pthread_cond_signal(&di->cond);
2268 pthread_mutex_unlock(&di->mutex);
2269 DBUG_VOID_RETURN;
2273 /* We kill all delayed threads when doing flush-tables */
2275 void kill_delayed_threads(void)
2277 VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list
2279 I_List_iterator<Delayed_insert> it(delayed_threads);
2280 Delayed_insert *di;
2281 while ((di= it++))
2283 di->thd.killed= THD::KILL_CONNECTION;
2284 if (di->thd.mysys_var)
2286 pthread_mutex_lock(&di->thd.mysys_var->mutex);
2287 if (di->thd.mysys_var->current_cond)
2290 We need the following test because the main mutex may be locked
2291 in handle_delayed_insert()
2293 if (&di->mutex != di->thd.mysys_var->current_mutex)
2294 pthread_mutex_lock(di->thd.mysys_var->current_mutex);
2295 pthread_cond_broadcast(di->thd.mysys_var->current_cond);
2296 if (&di->mutex != di->thd.mysys_var->current_mutex)
2297 pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
2299 pthread_mutex_unlock(&di->thd.mysys_var->mutex);
2302 VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
2306 static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di)
2308 DBUG_ENTER("handle_delayed_insert_impl");
2309 thd->thread_stack= (char*) &thd;
2310 if (init_thr_lock() || thd->store_globals())
2312 /* Can't use my_error since store_globals has perhaps failed */
2313 thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
2314 ER(ER_OUT_OF_RESOURCES));
2315 thd->fatal_error();
2316 goto err;
2320 Open table requires an initialized lex in case the table is
2321 partitioned. The .frm file contains a partial SQL string which is
2322 parsed using a lex, that depends on initialized thd->lex.
2324 lex_start(thd);
2325 thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
2327 Statement-based replication of INSERT DELAYED has problems with RAND()
2328 and user vars, so in mixed mode we go to row-based.
2330 thd->lex->set_stmt_unsafe();
2331 thd->set_current_stmt_binlog_row_based_if_mixed();
2333 /* Open table */
2334 if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
2335 TL_WRITE_DELAYED)))
2337 thd->fatal_error(); // Abort waiting inserts
2338 goto err;
2340 if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2342 thd->fatal_error();
2343 my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), di->table_list.table_name);
2344 goto err;
2346 if (di->table->triggers)
2349 Table has triggers. This is not an error, but we do
2350 not support triggers with delayed insert. Terminate the delayed
2351 thread without an error and thus request lock upgrade.
2353 goto err;
2355 di->table->copy_blobs=1;
2357 /* Tell client that the thread is initialized */
2358 pthread_cond_signal(&di->cond_client);
2360 /* Now wait until we get an insert or lock to handle */
2361 /* We will not abort as long as a client thread uses this thread */
2363 for (;;)
2365 if (thd->killed == THD::KILL_CONNECTION)
2367 uint lock_count;
2369 Remove this from delay insert list so that no one can request a
2370 table from this
2372 pthread_mutex_unlock(&di->mutex);
2373 pthread_mutex_lock(&LOCK_delayed_insert);
2374 di->unlink();
2375 lock_count=di->lock_count();
2376 pthread_mutex_unlock(&LOCK_delayed_insert);
2377 pthread_mutex_lock(&di->mutex);
2378 if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
2379 break; // Time to die
2382 if (!di->status && !di->stacked_inserts)
2384 struct timespec abstime;
2385 set_timespec(abstime, delayed_insert_timeout);
2387 /* Information for pthread_kill */
2388 di->thd.mysys_var->current_mutex= &di->mutex;
2389 di->thd.mysys_var->current_cond= &di->cond;
2390 thd_proc_info(&(di->thd), "Waiting for INSERT");
2392 DBUG_PRINT("info",("Waiting for someone to insert rows"));
2393 while (!thd->killed)
2395 int error;
2396 #if defined(HAVE_BROKEN_COND_TIMEDWAIT)
2397 error=pthread_cond_wait(&di->cond,&di->mutex);
2398 #else
2399 error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
2400 #ifdef EXTRA_DEBUG
2401 if (error && error != EINTR && error != ETIMEDOUT)
2403 fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
2404 DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
2405 error));
2407 #endif
2408 #endif
2409 if (thd->killed || di->status)
2410 break;
2411 if (error == ETIMEDOUT || error == ETIME)
2413 thd->killed= THD::KILL_CONNECTION;
2414 break;
2417 /* We can't lock di->mutex and mysys_var->mutex at the same time */
2418 pthread_mutex_unlock(&di->mutex);
2419 pthread_mutex_lock(&di->thd.mysys_var->mutex);
2420 di->thd.mysys_var->current_mutex= 0;
2421 di->thd.mysys_var->current_cond= 0;
2422 pthread_mutex_unlock(&di->thd.mysys_var->mutex);
2423 pthread_mutex_lock(&di->mutex);
2425 thd_proc_info(&(di->thd), 0);
2427 if (di->tables_in_use && ! thd->lock)
2429 bool not_used;
2431 Request for new delayed insert.
2432 Lock the table, but avoid to be blocked by a global read lock.
2433 If we got here while a global read lock exists, then one or more
2434 inserts started before the lock was requested. These are allowed
2435 to complete their work before the server returns control to the
2436 client which requested the global read lock. The delayed insert
2437 handler will close the table and finish when the outstanding
2438 inserts are done.
2440 if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2441 MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
2442 &not_used)))
2444 /* Fatal error */
2445 di->dead= 1;
2446 thd->killed= THD::KILL_CONNECTION;
2448 pthread_cond_broadcast(&di->cond_client);
2450 if (di->stacked_inserts)
2452 if (di->handle_inserts())
2454 /* Some fatal error */
2455 di->dead= 1;
2456 thd->killed= THD::KILL_CONNECTION;
2459 di->status=0;
2460 if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
2463 No one is doing a insert delayed
2464 Unlock table so that other threads can use it
2466 MYSQL_LOCK *lock=thd->lock;
2467 thd->lock=0;
2468 pthread_mutex_unlock(&di->mutex);
2470 We need to release next_insert_id before unlocking. This is
2471 enforced by handler::ha_external_lock().
2473 di->table->file->ha_release_auto_increment();
2474 mysql_unlock_tables(thd, lock);
2475 ha_autocommit_or_rollback(thd, 0);
2476 di->group_count=0;
2477 pthread_mutex_lock(&di->mutex);
2479 if (di->tables_in_use)
2480 pthread_cond_broadcast(&di->cond_client); // If waiting clients
2483 err:
2485 mysql_lock_tables() can potentially start a transaction and write
2486 a table map. In the event of an error, that transaction has to be
2487 rolled back. We only need to roll back a potential statement
2488 transaction, since real transactions are rolled back in
2489 close_thread_tables().
2491 TODO: This is not true any more, table maps are generated on the
2492 first call to ha_*_row() instead. Remove code that are used to
2493 cover for the case outlined above.
2495 ha_autocommit_or_rollback(thd, 1);
2497 DBUG_VOID_RETURN;
2502 * Create a new delayed insert thread
2505 pthread_handler_t handle_delayed_insert(void *arg)
2507 Delayed_insert *di=(Delayed_insert*) arg;
2508 THD *thd= &di->thd;
2510 pthread_detach_this_thread();
2511 /* Add thread to THD list so that's it's visible in 'show processlist' */
2512 pthread_mutex_lock(&LOCK_thread_count);
2513 thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
2514 thd->set_current_time();
2515 threads.append(thd);
2516 thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
2517 pthread_mutex_unlock(&LOCK_thread_count);
2520 Wait until the client runs into pthread_cond_wait(),
2521 where we free it after the table is opened and di linked in the list.
2522 If we did not wait here, the client might detect the opened table
2523 before it is linked to the list. It would release LOCK_delayed_create
2524 and allow another thread to create another handler for the same table,
2525 since it does not find one in the list.
2527 pthread_mutex_lock(&di->mutex);
2528 #if !defined( __WIN__) /* Win32 calls this in pthread_create */
2529 if (my_thread_init())
2531 /* Can't use my_error since store_globals has not yet been called */
2532 thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
2533 ER(ER_OUT_OF_RESOURCES));
2534 goto end;
2536 #endif
2538 handle_delayed_insert_impl(thd, di);
2540 #ifndef __WIN__
2541 end:
2542 #endif
2544 di should be unlinked from the thread handler list and have no active
2545 clients
2548 close_thread_tables(thd); // Free the table
2549 di->table=0;
2550 di->dead= 1; // If error
2551 thd->killed= THD::KILL_CONNECTION; // If error
2552 pthread_cond_broadcast(&di->cond_client); // Safety
2553 pthread_mutex_unlock(&di->mutex);
2555 pthread_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
2556 pthread_mutex_lock(&LOCK_delayed_insert);
2557 delete di;
2558 pthread_mutex_unlock(&LOCK_delayed_insert);
2559 pthread_mutex_unlock(&LOCK_delayed_create);
2561 my_thread_end();
2562 pthread_exit(0);
2564 return 0;
2568 /* Remove pointers from temporary fields to allocated values */
2570 static void unlink_blobs(register TABLE *table)
2572 for (Field **ptr=table->field ; *ptr ; ptr++)
2574 if ((*ptr)->flags & BLOB_FLAG)
2575 ((Field_blob *) (*ptr))->clear_temporary();
2579 /* Free blobs stored in current row */
2581 static void free_delayed_insert_blobs(register TABLE *table)
2583 for (Field **ptr=table->field ; *ptr ; ptr++)
2585 if ((*ptr)->flags & BLOB_FLAG)
2587 uchar *str;
2588 ((Field_blob *) (*ptr))->get_ptr(&str);
2589 my_free(str,MYF(MY_ALLOW_ZERO_PTR));
2590 ((Field_blob *) (*ptr))->reset();
2596 bool Delayed_insert::handle_inserts(void)
2598 int error;
2599 ulong max_rows;
2600 bool using_ignore= 0, using_opt_replace= 0,
2601 using_bin_log= mysql_bin_log.is_open();
2602 delayed_row *row;
2603 DBUG_ENTER("handle_inserts");
2605 /* Allow client to insert new rows */
2606 pthread_mutex_unlock(&mutex);
2608 table->next_number_field=table->found_next_number_field;
2609 table->use_all_columns();
2611 thd_proc_info(&thd, "upgrading lock");
2612 if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock))
2615 This can happen if thread is killed either by a shutdown
2616 or if another thread is removing the current table definition
2617 from the table cache.
2619 my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
2620 table->s->table_name.str);
2621 goto err;
2624 thd_proc_info(&thd, "insert");
2625 max_rows= delayed_insert_limit;
2626 if (thd.killed || table->needs_reopen_or_name_lock())
2628 thd.killed= THD::KILL_CONNECTION;
2629 max_rows= ULONG_MAX; // Do as much as possible
2633 We can't use row caching when using the binary log because if
2634 we get a crash, then binary log will contain rows that are not yet
2635 written to disk, which will cause problems in replication.
2637 if (!using_bin_log)
2638 table->file->extra(HA_EXTRA_WRITE_CACHE);
2639 pthread_mutex_lock(&mutex);
2641 while ((row=rows.get()))
2643 stacked_inserts--;
2644 pthread_mutex_unlock(&mutex);
2645 memcpy(table->record[0],row->record,table->s->reclength);
2647 thd.start_time=row->start_time;
2648 thd.query_start_used=row->query_start_used;
2650 To get the exact auto_inc interval to store in the binlog we must not
2651 use values from the previous interval (of the previous rows).
2653 bool log_query= (row->log_query && row->query.str != NULL);
2654 DBUG_PRINT("delayed", ("query: '%s' length: %lu", row->query.str ?
2655 row->query.str : "[NULL]",
2656 (ulong) row->query.length));
2657 if (log_query)
2660 This is the first value of an INSERT statement.
2661 It is the right place to clear a forced insert_id.
2662 This is usually done after the last value of an INSERT statement,
2663 but we won't know this in the insert delayed thread. But before
2664 the first value is sufficiently equivalent to after the last
2665 value of the previous statement.
2667 table->file->ha_release_auto_increment();
2668 thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
2670 thd.first_successful_insert_id_in_prev_stmt=
2671 row->first_successful_insert_id_in_prev_stmt;
2672 thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2673 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2674 table->timestamp_field_type= row->timestamp_field_type;
2675 table->auto_increment_field_not_null= row->auto_increment_field_not_null;
2677 /* Copy the session variables. */
2678 thd.variables.auto_increment_increment= row->auto_increment_increment;
2679 thd.variables.auto_increment_offset= row->auto_increment_offset;
2680 thd.variables.sql_mode= row->sql_mode;
2682 /* Copy a forced insert_id, if any. */
2683 if (row->forced_insert_id)
2685 DBUG_PRINT("delayed", ("received auto_inc: %lu",
2686 (ulong) row->forced_insert_id));
2687 thd.force_one_auto_inc_interval(row->forced_insert_id);
2690 info.ignore= row->ignore;
2691 info.handle_duplicates= row->dup;
2692 if (info.ignore ||
2693 info.handle_duplicates != DUP_ERROR)
2695 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2696 using_ignore=1;
2698 if (info.handle_duplicates == DUP_REPLACE &&
2699 (!table->triggers ||
2700 !table->triggers->has_delete_triggers()))
2702 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2703 using_opt_replace= 1;
2705 if (info.handle_duplicates == DUP_UPDATE)
2706 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2707 thd.clear_error(); // reset error for binlog
2708 if (write_record(&thd, table, &info))
2710 info.error_count++; // Ignore errors
2711 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
2712 row->log_query = 0;
2715 if (using_ignore)
2717 using_ignore=0;
2718 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2720 if (using_opt_replace)
2722 using_opt_replace= 0;
2723 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2726 if (log_query && mysql_bin_log.is_open())
2728 bool backup_time_zone_used = thd.time_zone_used;
2729 Time_zone *backup_time_zone = thd.variables.time_zone;
2730 if (row->time_zone != NULL)
2732 thd.time_zone_used = true;
2733 thd.variables.time_zone = row->time_zone;
2736 /* if the delayed insert was killed, the killed status is
2737 ignored while binlogging */
2738 int errcode= 0;
2739 if (thd.killed == THD::NOT_KILLED)
2740 errcode= query_error_code(&thd, TRUE);
2743 If the query has several rows to insert, only the first row will come
2744 here. In row-based binlogging, this means that the first row will be
2745 written to binlog as one Table_map event and one Rows event (due to an
2746 event flush done in binlog_query()), then all other rows of this query
2747 will be binlogged together as one single Table_map event and one
2748 single Rows event.
2750 if (thd.binlog_query(THD::ROW_QUERY_TYPE,
2751 row->query.str, row->query.length,
2752 FALSE, FALSE, errcode))
2753 goto err;
2755 thd.time_zone_used = backup_time_zone_used;
2756 thd.variables.time_zone = backup_time_zone;
2759 if (table->s->blob_fields)
2760 free_delayed_insert_blobs(table);
2761 thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
2762 thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
2763 pthread_mutex_lock(&mutex);
2765 delete row;
2767 Let READ clients do something once in a while
2768 We should however not break in the middle of a multi-line insert
2769 if we have binary logging enabled as we don't want other commands
2770 on this table until all entries has been processed
2772 if (group_count++ >= max_rows && (row= rows.head()) &&
2773 (!(row->log_query & using_bin_log)))
2775 group_count=0;
2776 if (stacked_inserts || tables_in_use) // Let these wait a while
2778 if (tables_in_use)
2779 pthread_cond_broadcast(&cond_client); // If waiting clients
2780 thd_proc_info(&thd, "reschedule");
2781 pthread_mutex_unlock(&mutex);
2782 if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
2784 /* This should never happen */
2785 table->file->print_error(error,MYF(0));
2786 sql_print_error("%s", thd.main_da.message());
2787 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
2788 goto err;
2790 query_cache_invalidate3(&thd, table, 1);
2791 if (thr_reschedule_write_lock(*thd.lock->locks))
2793 /* This is not known to happen. */
2794 my_error(ER_DELAYED_CANT_CHANGE_LOCK,MYF(ME_FATALERROR),
2795 table->s->table_name.str);
2796 goto err;
2798 if (!using_bin_log)
2799 table->file->extra(HA_EXTRA_WRITE_CACHE);
2800 pthread_mutex_lock(&mutex);
2801 thd_proc_info(&thd, "insert");
2803 if (tables_in_use)
2804 pthread_cond_broadcast(&cond_client); // If waiting clients
2807 thd_proc_info(&thd, 0);
2808 pthread_mutex_unlock(&mutex);
2811 We need to flush the pending event when using row-based
2812 replication since the flushing normally done in binlog_query() is
2813 not done last in the statement: for delayed inserts, the insert
2814 statement is logged *before* all rows are inserted.
2816 We can flush the pending event without checking the thd->lock
2817 since the delayed insert *thread* is not inside a stored function
2818 or trigger.
2820 TODO: Move the logging to last in the sequence of rows.
2822 if (thd.current_stmt_binlog_row_based &&
2823 thd.binlog_flush_pending_rows_event(TRUE))
2824 goto err;
2826 if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
2827 { // This shouldn't happen
2828 table->file->print_error(error,MYF(0));
2829 sql_print_error("%s", thd.main_da.message());
2830 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
2831 goto err;
2833 query_cache_invalidate3(&thd, table, 1);
2834 pthread_mutex_lock(&mutex);
2835 DBUG_RETURN(0);
2837 err:
2838 #ifndef DBUG_OFF
2839 max_rows= 0; // For DBUG output
2840 #endif
2841 /* Remove all not used rows */
2842 while ((row=rows.get()))
2844 if (table->s->blob_fields)
2846 memcpy(table->record[0],row->record,table->s->reclength);
2847 free_delayed_insert_blobs(table);
2849 delete row;
2850 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
2851 stacked_inserts--;
2852 #ifndef DBUG_OFF
2853 max_rows++;
2854 #endif
2856 DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
2857 thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
2858 pthread_mutex_lock(&mutex);
2859 DBUG_RETURN(1);
2861 #endif /* EMBEDDED_LIBRARY */
2863 /***************************************************************************
2864 Store records in INSERT ... SELECT *
2865 ***************************************************************************/
2869 make insert specific preparation and checks after opening tables
2871 SYNOPSIS
2872 mysql_insert_select_prepare()
2873 thd thread handler
2875 RETURN
2876 FALSE OK
2877 TRUE Error
2880 bool mysql_insert_select_prepare(THD *thd)
2882 LEX *lex= thd->lex;
2883 SELECT_LEX *select_lex= &lex->select_lex;
2884 TABLE_LIST *first_select_leaf_table;
2885 DBUG_ENTER("mysql_insert_select_prepare");
2888 Statement-based replication of INSERT ... SELECT ... LIMIT is not safe
2889 as order of rows is not defined, so in mixed mode we go to row-based.
2891 Note that we may consider a statement as safe if ORDER BY primary_key
2892 is present or we SELECT a constant. However it may confuse users to
2893 see very similiar statements replicated differently.
2895 if (lex->current_select->select_limit)
2897 lex->set_stmt_unsafe();
2898 thd->set_current_stmt_binlog_row_based_if_mixed();
2901 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
2902 clause if table is VIEW
2905 if (mysql_prepare_insert(thd, lex->query_tables,
2906 lex->query_tables->table, lex->field_list, 0,
2907 lex->update_list, lex->value_list,
2908 lex->duplicates,
2909 &select_lex->where, TRUE, FALSE, FALSE))
2910 DBUG_RETURN(TRUE);
2913 exclude first table from leaf tables list, because it belong to
2914 INSERT
2916 DBUG_ASSERT(select_lex->leaf_tables != 0);
2917 lex->leaf_tables_insert= select_lex->leaf_tables;
2918 /* skip all leaf tables belonged to view where we are insert */
2919 for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2920 first_select_leaf_table &&
2921 first_select_leaf_table->belong_to_view &&
2922 first_select_leaf_table->belong_to_view ==
2923 lex->leaf_tables_insert->belong_to_view;
2924 first_select_leaf_table= first_select_leaf_table->next_leaf)
2926 select_lex->leaf_tables= first_select_leaf_table;
2927 DBUG_RETURN(FALSE);
2931 select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
2932 List<Item> *fields_par,
2933 List<Item> *update_fields,
2934 List<Item> *update_values,
2935 enum_duplicates duplic,
2936 bool ignore_check_option_errors)
2937 :table_list(table_list_par), table(table_par), fields(fields_par),
2938 autoinc_value_of_last_inserted_row(0),
2939 insert_into_view(table_list_par && table_list_par->view != 0)
2941 bzero((char*) &info,sizeof(info));
2942 info.handle_duplicates= duplic;
2943 info.ignore= ignore_check_option_errors;
2944 info.update_fields= update_fields;
2945 info.update_values= update_values;
2946 if (table_list_par)
2947 info.view= (table_list_par->view ? table_list_par : 0);
2952 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2954 LEX *lex= thd->lex;
2955 int res;
2956 table_map map= 0;
2957 SELECT_LEX *lex_current_select_save= lex->current_select;
2958 DBUG_ENTER("select_insert::prepare");
2960 unit= u;
2963 Since table in which we are going to insert is added to the first
2964 select, LEX::current_select should point to the first select while
2965 we are fixing fields from insert list.
2967 lex->current_select= &lex->select_lex;
2969 /* Errors during check_insert_fields() should not be ignored. */
2970 lex->current_select->no_error= FALSE;
2971 res= check_insert_fields(thd, table_list, *fields, values,
2972 !insert_into_view, &map) ||
2973 setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2975 if (!res && fields->elements)
2977 bool saved_abort_on_warning= thd->abort_on_warning;
2978 thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
2979 (MODE_STRICT_TRANS_TABLES |
2980 MODE_STRICT_ALL_TABLES));
2981 res= check_that_all_fields_are_given_values(thd, table_list->table,
2982 table_list);
2983 thd->abort_on_warning= saved_abort_on_warning;
2986 if (info.handle_duplicates == DUP_UPDATE && !res)
2988 Name_resolution_context *context= &lex->select_lex.context;
2989 Name_resolution_context_state ctx_state;
2991 /* Save the state of the current name resolution context. */
2992 ctx_state.save_state(context, table_list);
2994 /* Perform name resolution only in the first table - 'table_list'. */
2995 table_list->next_local= 0;
2996 context->resolve_in_table_list_only(table_list);
2998 lex->select_lex.no_wrap_view_item= TRUE;
2999 res= res || check_update_fields(thd, context->table_list,
3000 *info.update_fields, &map);
3001 lex->select_lex.no_wrap_view_item= FALSE;
3003 When we are not using GROUP BY and there are no ungrouped aggregate functions
3004 we can refer to other tables in the ON DUPLICATE KEY part.
3005 We use next_name_resolution_table descructively, so check it first (views?)
3007 DBUG_ASSERT (!table_list->next_name_resolution_table);
3008 if (lex->select_lex.group_list.elements == 0 &&
3009 !lex->select_lex.with_sum_func)
3011 We must make a single context out of the two separate name resolution contexts :
3012 the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
3013 To do that we must concatenate the two lists
3015 table_list->next_name_resolution_table=
3016 ctx_state.get_first_name_resolution_table();
3018 res= res || setup_fields(thd, 0, *info.update_values,
3019 MARK_COLUMNS_READ, 0, 0);
3020 if (!res)
3023 Traverse the update values list and substitute fields from the
3024 select for references (Item_ref objects) to them. This is done in
3025 order to get correct values from those fields when the select
3026 employs a temporary table.
3028 List_iterator<Item> li(*info.update_values);
3029 Item *item;
3031 while ((item= li++))
3033 item->transform(&Item::update_value_transformer,
3034 (uchar*)lex->current_select);
3038 /* Restore the current context. */
3039 ctx_state.restore_state(context, table_list);
3042 lex->current_select= lex_current_select_save;
3043 if (res)
3044 DBUG_RETURN(1);
3046 if it is INSERT into join view then check_insert_fields already found
3047 real table for insert
3049 table= table_list->table;
3052 Is table which we are changing used somewhere in other parts of
3053 query
3055 if (unique_table(thd, table_list, table_list->next_global, 0))
3057 /* Using same table for INSERT and SELECT */
3058 lex->current_select->options|= OPTION_BUFFER_RESULT;
3059 lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3061 else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
3062 !thd->prelocked_mode)
3065 We must not yet prepare the result table if it is the same as one of the
3066 source tables (INSERT SELECT). The preparation may disable
3067 indexes on the result table, which may be used during the select, if it
3068 is the same table (Bug #6034). Do the preparation after the select phase
3069 in select_insert::prepare2().
3070 We won't start bulk inserts at all if this statement uses functions or
3071 should invoke triggers since they may access to the same table too.
3073 table->file->ha_start_bulk_insert((ha_rows) 0);
3075 restore_record(table,s->default_values); // Get empty record
3076 table->next_number_field=table->found_next_number_field;
3078 #ifdef HAVE_REPLICATION
3079 if (thd->slave_thread &&
3080 (info.handle_duplicates == DUP_UPDATE) &&
3081 (table->next_number_field != NULL) &&
3082 rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
3083 DBUG_RETURN(1);
3084 #endif
3086 thd->cuted_fields=0;
3087 if (info.ignore || info.handle_duplicates != DUP_ERROR)
3088 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3089 if (info.handle_duplicates == DUP_REPLACE &&
3090 (!table->triggers || !table->triggers->has_delete_triggers()))
3091 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3092 if (info.handle_duplicates == DUP_UPDATE)
3093 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3094 thd->abort_on_warning= (!info.ignore &&
3095 (thd->variables.sql_mode &
3096 (MODE_STRICT_TRANS_TABLES |
3097 MODE_STRICT_ALL_TABLES)));
3098 res= (table_list->prepare_where(thd, 0, TRUE) ||
3099 table_list->prepare_check_option(thd));
3101 if (!res)
3102 prepare_triggers_for_insert_stmt(table);
3104 DBUG_RETURN(res);
3109 Finish the preparation of the result table.
3111 SYNOPSIS
3112 select_insert::prepare2()
3113 void
3115 DESCRIPTION
3116 If the result table is the same as one of the source tables (INSERT SELECT),
3117 the result table is not finally prepared at the join prepair phase.
3118 Do the final preparation now.
3120 RETURN
3121 0 OK
3124 int select_insert::prepare2(void)
3126 DBUG_ENTER("select_insert::prepare2");
3127 if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
3128 !thd->prelocked_mode)
3129 table->file->ha_start_bulk_insert((ha_rows) 0);
3130 DBUG_RETURN(0);
3134 void select_insert::cleanup()
3136 /* select_insert/select_create are never re-used in prepared statement */
3137 DBUG_ASSERT(0);
3140 select_insert::~select_insert()
3142 DBUG_ENTER("~select_insert");
3143 if (table)
3145 table->next_number_field=0;
3146 table->auto_increment_field_not_null= FALSE;
3147 table->file->ha_reset();
3149 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3150 thd->abort_on_warning= 0;
3151 DBUG_VOID_RETURN;
3155 bool select_insert::send_data(List<Item> &values)
3157 DBUG_ENTER("select_insert::send_data");
3158 bool error=0;
3160 if (unit->offset_limit_cnt)
3161 { // using limit offset,count
3162 unit->offset_limit_cnt--;
3163 DBUG_RETURN(0);
3166 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
3167 store_values(values);
3168 thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
3169 if (thd->is_error())
3171 table->auto_increment_field_not_null= FALSE;
3172 DBUG_RETURN(1);
3174 if (table_list) // Not CREATE ... SELECT
3176 switch (table_list->view_check_option(thd, info.ignore)) {
3177 case VIEW_CHECK_SKIP:
3178 DBUG_RETURN(0);
3179 case VIEW_CHECK_ERROR:
3180 DBUG_RETURN(1);
3184 // Release latches in case bulk insert takes a long time
3185 ha_release_temporary_latches(thd);
3187 error= write_record(thd, table, &info);
3188 table->auto_increment_field_not_null= FALSE;
3190 if (!error)
3192 if (table->triggers || info.handle_duplicates == DUP_UPDATE)
3195 Restore fields of the record since it is possible that they were
3196 changed by ON DUPLICATE KEY UPDATE clause.
3198 If triggers exist then whey can modify some fields which were not
3199 originally touched by INSERT ... SELECT, so we have to restore
3200 their original values for the next row.
3202 restore_record(table, s->default_values);
3204 if (table->next_number_field)
3207 If no value has been autogenerated so far, we need to remember the
3208 value we just saw, we may need to send it to client in the end.
3210 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
3211 autoinc_value_of_last_inserted_row=
3212 table->next_number_field->val_int();
3214 Clear auto-increment field for the next record, if triggers are used
3215 we will clear it twice, but this should be cheap.
3217 table->next_number_field->reset();
3220 DBUG_RETURN(error);
3224 void select_insert::store_values(List<Item> &values)
3226 if (fields->elements)
3227 fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
3228 table->triggers, TRG_EVENT_INSERT);
3229 else
3230 fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
3231 table->triggers, TRG_EVENT_INSERT);
3234 void select_insert::send_error(uint errcode,const char *err)
3236 DBUG_ENTER("select_insert::send_error");
3238 my_message(errcode, err, MYF(0));
3240 DBUG_VOID_RETURN;
3244 bool select_insert::send_eof()
3246 int error;
3247 bool const trans_table= table->file->has_transactions();
3248 ulonglong id;
3249 bool changed;
3250 THD::killed_state killed_status= thd->killed;
3251 DBUG_ENTER("select_insert::send_eof");
3252 DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
3253 trans_table, table->file->table_type()));
3255 error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
3256 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3257 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3259 changed= (info.copied || info.deleted || info.updated);
3260 if (changed)
3263 We must invalidate the table in the query cache before binlog writing
3264 and ha_autocommit_or_rollback.
3266 query_cache_invalidate3(thd, table, 1);
3267 if (thd->transaction.stmt.modified_non_trans_table)
3268 thd->transaction.all.modified_non_trans_table= TRUE;
3270 DBUG_ASSERT(trans_table || !changed ||
3271 thd->transaction.stmt.modified_non_trans_table);
3274 Write to binlog before commiting transaction. No statement will
3275 be written by the write_to_binlog() below in RBR mode. All the
3276 events are in the transaction cache and will be written when
3277 ha_autocommit_or_rollback() is issued below.
3279 if (mysql_bin_log.is_open() &&
3280 (!error || thd->transaction.stmt.modified_non_trans_table))
3282 int errcode= 0;
3283 if (!error)
3284 thd->clear_error();
3285 else
3286 errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
3288 if (write_to_binlog(trans_table, errcode))
3290 table->file->ha_release_auto_increment();
3291 DBUG_RETURN(1);
3294 table->file->ha_release_auto_increment();
3296 if (error)
3298 table->file->print_error(error,MYF(0));
3299 DBUG_RETURN(1);
3301 char buff[160];
3302 if (info.ignore)
3303 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
3304 (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
3305 else
3306 sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
3307 (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
3308 thd->row_count_func= info.copied + info.deleted +
3309 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
3310 info.touched : info.updated);
3312 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
3313 thd->first_successful_insert_id_in_cur_stmt :
3314 (thd->arg_of_last_insert_id_function ?
3315 thd->first_successful_insert_id_in_prev_stmt :
3316 (info.copied ? autoinc_value_of_last_inserted_row : 0));
3317 ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
3318 DBUG_RETURN(0);
3321 void select_insert::abort() {
3323 DBUG_ENTER("select_insert::abort");
3325 If the creation of the table failed (due to a syntax error, for
3326 example), no table will have been opened and therefore 'table'
3327 will be NULL. In that case, we still need to execute the rollback
3328 and the end of the function.
3330 if (table)
3332 bool changed, transactional_table;
3334 If we are not in prelocked mode, we end the bulk insert started
3335 before.
3337 if (!thd->prelocked_mode)
3338 table->file->ha_end_bulk_insert();
3341 If at least one row has been inserted/modified and will stay in
3342 the table (the table doesn't have transactions) we must write to
3343 the binlog (and the error code will make the slave stop).
3345 For many errors (example: we got a duplicate key error while
3346 inserting into a MyISAM table), no row will be added to the table,
3347 so passing the error to the slave will not help since there will
3348 be an error code mismatch (the inserts will succeed on the slave
3349 with no error).
3351 If table creation failed, the number of rows modified will also be
3352 zero, so no check for that is made.
3354 changed= (info.copied || info.deleted || info.updated);
3355 transactional_table= table->file->has_transactions();
3356 if (thd->transaction.stmt.modified_non_trans_table)
3358 if (mysql_bin_log.is_open())
3360 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3361 /* error of writing binary log is ignored */
3362 write_to_binlog(transactional_table, errcode);
3364 if (!thd->current_stmt_binlog_row_based && !can_rollback_data())
3365 thd->transaction.all.modified_non_trans_table= TRUE;
3366 if (changed)
3367 query_cache_invalidate3(thd, table, 1);
3369 DBUG_ASSERT(transactional_table || !changed ||
3370 thd->transaction.stmt.modified_non_trans_table);
3371 table->file->ha_release_auto_increment();
3374 DBUG_VOID_RETURN;
3377 int select_insert::write_to_binlog(bool is_trans, int errcode)
3379 /* It is only for statement mode */
3380 if (thd->current_stmt_binlog_row_based)
3381 return 0;
3383 return thd->binlog_query(THD::ROW_QUERY_TYPE,
3384 thd->query(), thd->query_length(),
3385 is_trans, FALSE, errcode);
3388 /* Override the select_insert::write_to_binlog */
3389 int select_create::write_to_binlog(bool is_trans, int errcode)
3391 /* It is only for statement mode */
3392 if (thd->current_stmt_binlog_row_based)
3393 return 0;
3396 WL#5370 Keep the compatibility between 5.1 master and 5.5 slave.
3397 Binlog a 'INSERT ... SELECT' statement only when it has the option
3398 'IF NOT EXISTS' and the table already exists as a base table.
3400 if ((create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS) &&
3401 create_info->table_existed)
3403 String query;
3404 int result;
3406 thd->binlog_start_trans_and_stmt();
3407 /* Binlog the CREATE TABLE IF NOT EXISTS statement */
3408 result= binlog_show_create_table(&table, 1, 0);
3409 if (result)
3410 return result;
3412 uint db_len= strlen(create_table->db);
3413 uint table_len= strlen(create_info->alias);
3414 uint select_len= thd->query_length() - thd->lex->create_select_pos;
3415 uint field_len= (table->s->fields - (field - table->field)) *
3416 (MAX_FIELD_NAME + 3);
3419 pre-allocating memory reduces the times of reallocating memory,
3420 when calling query.appen().
3421 40bytes is enough for other words("INSERT IGNORE INTO", etc.).
3423 if (query.real_alloc(40 + db_len + table_len + field_len + select_len))
3424 return 1;
3426 if (thd->lex->create_select_in_comment)
3427 query.append(STRING_WITH_LEN("/*! "));
3428 if (thd->lex->ignore)
3429 query.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
3430 else if (thd->lex->duplicates == DUP_REPLACE)
3431 query.append(STRING_WITH_LEN("REPLACE INTO "));
3432 else
3433 query.append(STRING_WITH_LEN("INSERT INTO "));
3435 append_identifier(thd, &query, create_table->db, db_len);
3436 query.append(STRING_WITH_LEN("."));
3437 append_identifier(thd, &query, create_info->alias, table_len );
3438 query.append(STRING_WITH_LEN(" "));
3441 The insert items.
3442 Field is the the rightmost columns that the rows are inster in.
3444 query.append(STRING_WITH_LEN("("));
3445 for (Field **f= field ; *f ; f++)
3447 if (f != field)
3448 query.append(STRING_WITH_LEN(","));
3450 append_identifier(thd, &query, (*f)->field_name,
3451 strlen((*f)->field_name));
3453 query.append(STRING_WITH_LEN(") "));
3455 /* The SELECT clause*/
3456 DBUG_ASSERT(thd->lex->create_select_pos);
3457 if (thd->lex->create_select_start_with_brace)
3458 query.append(STRING_WITH_LEN("("));
3459 if (query.append(thd->query() + thd->lex->create_select_pos, select_len))
3460 return 1;
3463 Avoid to use thd->binlog_query() twice, otherwise it will print the unsafe
3464 warning twice.
3466 Query_log_event ev(thd, query.c_ptr_safe(), query.length(), is_trans,
3467 FALSE, errcode);
3468 return mysql_bin_log.write(&ev);
3470 else
3471 return select_insert::write_to_binlog(is_trans, errcode);
3474 /***************************************************************************
3475 CREATE TABLE (SELECT) ...
3476 ***************************************************************************/
3479 Create table from lists of fields and items (or just return TABLE
3480 object for pre-opened existing table).
3482 SYNOPSIS
3483 create_table_from_items()
3484 thd in Thread object
3485 create_info in Create information (like MAX_ROWS, ENGINE or
3486 temporary table flag)
3487 create_table in Pointer to TABLE_LIST object providing database
3488 and name for table to be created or to be open
3489 alter_info in/out Initial list of columns and indexes for the table
3490 to be created
3491 items in List of items which should be used to produce rest
3492 of fields for the table (corresponding fields will
3493 be added to the end of alter_info->create_list)
3494 lock out Pointer to the MYSQL_LOCK object for table created
3495 (or open temporary table) will be returned in this
3496 parameter. Since this table is not included in
3497 THD::lock caller is responsible for explicitly
3498 unlocking this table.
3499 hooks
3501 NOTES
3502 This function behaves differently for base and temporary tables:
3503 - For base table we assume that either table exists and was pre-opened
3504 and locked at open_and_lock_tables() stage (and in this case we just
3505 emit error or warning and return pre-opened TABLE object) or special
3506 placeholder was put in table cache that guarantees that this table
3507 won't be created or opened until the placeholder will be removed
3508 (so there is an exclusive lock on this table).
3509 - We don't pre-open existing temporary table, instead we either open
3510 or create and then open table in this function.
3512 Since this function contains some logic specific to CREATE TABLE ...
3513 SELECT it should be changed before it can be used in other contexts.
3515 RETURN VALUES
3516 non-zero Pointer to TABLE object for table created or opened
3517 0 Error
3520 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
3521 TABLE_LIST *create_table,
3522 Alter_info *alter_info,
3523 List<Item> *items,
3524 MYSQL_LOCK **lock,
3525 TABLEOP_HOOKS *hooks)
3527 TABLE tmp_table; // Used during 'Create_field()'
3528 TABLE_SHARE share;
3529 TABLE *table= 0;
3530 uint select_field_count= items->elements;
3531 /* Add selected items to field list */
3532 List_iterator_fast<Item> it(*items);
3533 Item *item;
3534 Field *tmp_field;
3535 bool not_used;
3536 DBUG_ENTER("create_table_from_items");
3538 tmp_table.alias= 0;
3539 tmp_table.timestamp_field= 0;
3540 tmp_table.s= &share;
3541 init_tmp_table_share(thd, &share, "", 0, "", "");
3543 tmp_table.s->db_create_options=0;
3544 tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
3545 tmp_table.s->db_low_byte_first=
3546 test(create_info->db_type == myisam_hton ||
3547 create_info->db_type == heap_hton);
3548 tmp_table.null_row=tmp_table.maybe_null=0;
3550 while ((item=it++))
3552 Create_field *cr_field;
3553 Field *field, *def_field;
3554 if (item->type() == Item::FUNC_ITEM)
3555 if (item->result_type() != STRING_RESULT)
3556 field= item->tmp_table_field(&tmp_table);
3557 else
3558 field= item->tmp_table_field_from_field_type(&tmp_table, 0);
3559 else
3560 field= create_tmp_field(thd, &tmp_table, item, item->type(),
3561 (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
3563 if (!field ||
3564 !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
3565 ((Item_field *)item)->field :
3566 (Field*) 0))))
3567 DBUG_RETURN(0);
3568 if (item->maybe_null)
3569 cr_field->flags &= ~NOT_NULL_FLAG;
3570 alter_info->create_list.push_back(cr_field);
3573 DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););
3576 Create and lock table.
3578 Note that we either creating (or opening existing) temporary table or
3579 creating base table on which name we have exclusive lock. So code below
3580 should not cause deadlocks or races.
3582 We don't log the statement, it will be logged later.
3584 If this is a HEAP table, the automatic DELETE FROM which is written to the
3585 binlog when a HEAP table is opened for the first time since startup, must
3586 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
3587 don't want to delete from it) 2) it would be written before the CREATE
3588 TABLE, which is a wrong order. So we keep binary logging disabled when we
3589 open_table().
3592 tmp_disable_binlog(thd);
3593 if (!mysql_create_table_no_lock(thd, create_table->db,
3594 create_table->table_name,
3595 create_info, alter_info, 0,
3596 select_field_count))
3598 if (create_info->table_existed &&
3599 !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
3602 This means that someone created table underneath server
3603 or it was created via different mysqld front-end to the
3604 cluster. We don't have much options but throw an error.
3606 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
3607 DBUG_RETURN(0);
3610 DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););
3612 if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
3614 VOID(pthread_mutex_lock(&LOCK_open));
3615 if (reopen_name_locked_table(thd, create_table, FALSE))
3617 quick_rm_table(create_info->db_type, create_table->db,
3618 table_case_name(create_info, create_table->table_name),
3621 else
3622 table= create_table->table;
3623 VOID(pthread_mutex_unlock(&LOCK_open));
3625 else
3627 if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
3628 MYSQL_OPEN_TEMPORARY_ONLY)) &&
3629 !create_info->table_existed)
3632 This shouldn't happen as creation of temporary table should make
3633 it preparable for open. But let us do close_temporary_table() here
3634 just in case.
3636 drop_temporary_table(thd, create_table);
3640 reenable_binlog(thd);
3641 if (!table) // open failed
3642 DBUG_RETURN(0);
3645 DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););
3647 table->reginfo.lock_type=TL_WRITE;
3648 hooks->prelock(&table, 1); // Call prelock hooks
3649 if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
3650 MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
3651 hooks->postlock(&table, 1))
3653 if (*lock)
3655 mysql_unlock_tables(thd, *lock);
3656 *lock= 0;
3659 if (!create_info->table_existed)
3660 drop_open_table(thd, table, create_table->db, create_table->table_name);
3661 DBUG_RETURN(0);
3663 DBUG_RETURN(table);
3668 select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3670 MYSQL_LOCK *extra_lock= NULL;
3671 DBUG_ENTER("select_create::prepare");
3673 TABLEOP_HOOKS *hook_ptr= NULL;
3675 For row-based replication, the CREATE-SELECT statement is written
3676 in two pieces: the first one contain the CREATE TABLE statement
3677 necessary to create the table and the second part contain the rows
3678 that should go into the table.
3680 For non-temporary tables, the start of the CREATE-SELECT
3681 implicitly commits the previous transaction, and all events
3682 forming the statement will be stored the transaction cache. At end
3683 of the statement, the entire statement is committed as a
3684 transaction, and all events are written to the binary log.
3686 On the master, the table is locked for the duration of the
3687 statement, but since the CREATE part is replicated as a simple
3688 statement, there is no way to lock the table for accesses on the
3689 slave. Hence, we have to hold on to the CREATE part of the
3690 statement until the statement has finished.
3692 class MY_HOOKS : public TABLEOP_HOOKS {
3693 public:
3694 MY_HOOKS(select_create *x, TABLE_LIST *create_table,
3695 TABLE_LIST *select_tables)
3696 : ptr(x), all_tables(*create_table)
3698 all_tables.next_global= select_tables;
3701 private:
3702 virtual int do_postlock(TABLE **tables, uint count)
3704 THD *thd= const_cast<THD*>(ptr->get_thd());
3705 if (int error= decide_logging_format(thd, &all_tables))
3706 return error;
3708 TABLE const *const table = *tables;
3709 if (thd->current_stmt_binlog_row_based &&
3710 !table->s->tmp_table &&
3711 !ptr->get_create_info()->table_existed)
3713 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3714 if (int error= ptr->binlog_show_create_table(tables, count, errcode))
3715 return error;
3717 return 0;
3720 select_create *ptr;
3721 TABLE_LIST all_tables;
3724 MY_HOOKS hooks(this, create_table, select_tables);
3725 hook_ptr= &hooks;
3727 unit= u;
3730 Start a statement transaction before the create if we are using
3731 row-based replication for the statement. If we are creating a
3732 temporary table, we need to start a statement transaction.
3734 if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
3735 thd->current_stmt_binlog_row_based &&
3736 mysql_bin_log.is_open())
3738 thd->binlog_start_trans_and_stmt();
3741 DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););
3743 if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
3744 (create_table->table && create_table->table->db_stat))
3746 /* Table already exists and was open at open_and_lock_tables() stage. */
3747 if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
3749 /* Mark that table existed */
3750 create_info->table_existed= 1;
3751 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
3752 ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
3753 create_table->table_name);
3754 if (thd->current_stmt_binlog_row_based)
3756 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
3757 binlog_show_create_table(&(create_table->table), 1, errcode);
3759 table= create_table->table;
3761 else
3763 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
3764 DBUG_RETURN(-1);
3767 else
3768 if (!(table= create_table_from_items(thd, create_info, create_table,
3769 alter_info, &values,
3770 &extra_lock, hook_ptr)))
3771 /* abort() deletes table */
3772 DBUG_RETURN(-1);
3774 if (extra_lock)
3776 DBUG_ASSERT(m_plock == NULL);
3778 if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
3779 m_plock= &m_lock;
3780 else
3781 m_plock= &thd->extra_lock;
3783 *m_plock= extra_lock;
3786 if (table->s->fields < values.elements)
3788 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
3789 DBUG_RETURN(-1);
3792 /* First field to copy */
3793 field= table->field+table->s->fields - values.elements;
3795 /* Mark all fields that are given values */
3796 for (Field **f= field ; *f ; f++)
3797 bitmap_set_bit(table->write_set, (*f)->field_index);
3799 /* Don't set timestamp if used */
3800 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
3801 table->next_number_field=table->found_next_number_field;
3803 restore_record(table,s->default_values); // Get empty record
3804 thd->cuted_fields=0;
3805 if (info.ignore || info.handle_duplicates != DUP_ERROR)
3806 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3807 if (info.handle_duplicates == DUP_REPLACE &&
3808 (!table->triggers || !table->triggers->has_delete_triggers()))
3809 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3810 if (info.handle_duplicates == DUP_UPDATE)
3811 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3812 if (!thd->prelocked_mode)
3813 table->file->ha_start_bulk_insert((ha_rows) 0);
3814 thd->abort_on_warning= (!info.ignore &&
3815 (thd->variables.sql_mode &
3816 (MODE_STRICT_TRANS_TABLES |
3817 MODE_STRICT_ALL_TABLES)));
3818 if (check_that_all_fields_are_given_values(thd, table, table_list))
3819 DBUG_RETURN(1);
3820 table->mark_columns_needed_for_insert();
3821 table->file->extra(HA_EXTRA_WRITE_CACHE);
3822 DBUG_RETURN(0);
3826 select_create::binlog_show_create_table(TABLE **tables, uint count, int errcode)
3829 Note 1: We generate a CREATE TABLE statement for the
3830 created table by calling store_create_info() (behaves as SHOW
3831 CREATE TABLE). In the event of an error, nothing should be
3832 written to the binary log, even if the table is non-transactional;
3833 therefore we pretend that the generated CREATE TABLE statement is
3834 for a transactional table. The event will then be put in the
3835 transaction cache, and any subsequent events (e.g., table-map
3836 events and binrow events) will also be put there. We can then use
3837 ha_autocommit_or_rollback() to either throw away the entire
3838 kaboodle of events, or write them to the binary log.
3840 We write the CREATE TABLE statement here and not in prepare()
3841 since there potentially are sub-selects or accesses to information
3842 schema that will do a close_thread_tables(), destroying the
3843 statement transaction cache.
3845 DBUG_ASSERT(tables && *tables && count > 0);
3847 char buf[2048];
3848 String query(buf, sizeof(buf), system_charset_info);
3849 int result;
3850 TABLE_LIST tmp_table_list;
3852 memset(&tmp_table_list, 0, sizeof(tmp_table_list));
3853 tmp_table_list.table = *tables;
3854 query.length(0); // Have to zero it since constructor doesn't
3856 result= store_create_info(thd, &tmp_table_list, &query, create_info,
3857 /* show_database */ TRUE);
3858 DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3860 if (mysql_bin_log.is_open())
3862 result= thd->binlog_query(THD::STMT_QUERY_TYPE,
3863 query.ptr(), query.length(),
3864 /* is_trans */ TRUE,
3865 /* suppress_use */ FALSE,
3866 errcode);
3868 return result;
3871 void select_create::store_values(List<Item> &values)
3873 fill_record_n_invoke_before_triggers(thd, field, values, 1,
3874 table->triggers, TRG_EVENT_INSERT);
3878 void select_create::send_error(uint errcode,const char *err)
3880 DBUG_ENTER("select_create::send_error");
3882 DBUG_PRINT("info",
3883 ("Current statement %s row-based",
3884 thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
3885 DBUG_PRINT("info",
3886 ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3887 (ulong) table,
3888 table && !table->s->tmp_table ? "is NOT" : "is"));
3889 DBUG_PRINT("info",
3890 ("Table %s prior to executing this statement",
3891 get_create_info()->table_existed ? "existed" : "did not exist"));
3894 This will execute any rollbacks that are necessary before writing
3895 the transcation cache.
3897 We disable the binary log since nothing should be written to the
3898 binary log. This disabling is important, since we potentially do
3899 a "roll back" of non-transactional tables by removing the table,
3900 and the actual rollback might generate events that should not be
3901 written to the binary log.
3904 tmp_disable_binlog(thd);
3905 select_insert::send_error(errcode, err);
3906 reenable_binlog(thd);
3908 DBUG_VOID_RETURN;
3912 bool select_create::send_eof()
3914 bool tmp=select_insert::send_eof();
3915 if (tmp)
3916 abort();
3917 else
3920 Do an implicit commit at end of statement for non-temporary
3921 tables. This can fail, but we should unlock the table
3922 nevertheless.
3924 if (!table->s->tmp_table)
3926 ha_autocommit_or_rollback(thd, 0);
3927 end_active_trans(thd);
3930 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3931 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3932 if (m_plock)
3934 mysql_unlock_tables(thd, *m_plock);
3935 *m_plock= NULL;
3936 m_plock= NULL;
3939 return tmp;
3943 void select_create::abort()
3945 DBUG_ENTER("select_create::abort");
3948 In select_insert::abort() we roll back the statement, including
3949 truncating the transaction cache of the binary log. To do this, we
3950 pretend that the statement is transactional, even though it might
3951 be the case that it was not.
3953 We roll back the statement prior to deleting the table and prior
3954 to releasing the lock on the table, since there might be potential
3955 for failure if the rollback is executed after the drop or after
3956 unlocking the table.
3958 We also roll back the statement regardless of whether the creation
3959 of the table succeeded or not, since we need to reset the binary
3960 log state.
3962 tmp_disable_binlog(thd);
3963 select_insert::abort();
3964 thd->transaction.stmt.modified_non_trans_table= FALSE;
3965 reenable_binlog(thd);
3966 /* possible error of writing binary log is ignored deliberately */
3967 (void)thd->binlog_flush_pending_rows_event(TRUE);
3969 if (m_plock)
3971 mysql_unlock_tables(thd, *m_plock);
3972 *m_plock= NULL;
3973 m_plock= NULL;
3976 if (table)
3978 if (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
3979 thd->current_stmt_binlog_row_based &&
3980 !(thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
3981 mysql_bin_log.is_open())
3984 This should be removed after BUG#47899.
3986 mysql_bin_log.reset_gathered_updates(thd);
3989 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3990 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3991 if (!create_info->table_existed)
3992 drop_open_table(thd, table, create_table->db, create_table->table_name);
3993 table=0; // Safety
3995 DBUG_VOID_RETURN;
3999 /*****************************************************************************
4000 Instansiate templates
4001 *****************************************************************************/
4003 #ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
4004 template class List_iterator_fast<List_item>;
4005 #ifndef EMBEDDED_LIBRARY
4006 template class I_List<Delayed_insert>;
4007 template class I_List_iterator<Delayed_insert>;
4008 template class I_List<delayed_row>;
4009 #endif /* EMBEDDED_LIBRARY */
4010 #endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */