2 Copyright (c) 2004, 2012, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; version 2 of
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 #ifdef USE_PRAGMA_IMPLEMENTATION
20 #pragma implementation // gcc: Class implementation
23 #include "mysql_priv.h"
26 #include "ha_archive.h"
29 #include <mysql/plugin.h>
32 First, if you want to understand storage engines you should look at
33 ha_example.cc and ha_example.h.
35 This example was written as a test case for a customer who needed
36 a storage engine without indexes that could compress data very well.
37 So, welcome to a completely compressed storage engine. This storage
38 engine only does inserts. No replace, deletes, or updates. All reads are
39 complete table scans. Compression is done through a combination of packing
40 and making use of the zlib library
42 We keep a file pointer open for each instance of ha_archive for each read
43 but for writes we keep one open file handle just for that. We flush it
44 only if we have a read occur. azip handles compressing lots of records
45 at once much better then doing lots of little records between writes.
46 It is possible to not lock on writes but this would then mean we couldn't
47 handle bulk inserts as well (that is if someone was trying to read at
48 the same time since we would want to flush).
50 A "meta" file is kept alongside the data file. This file serves two purpose.
51 The first purpose is to track the number of rows in the table. The second
52 purpose is to determine if the table was closed properly or not. When the
53 meta file is first opened it is marked as dirty. It is opened when the table
54 itself is opened for writing. When the table is closed the new count for rows
55 is written to the meta file and the file is marked as clean. If the meta file
56 is opened and it is marked as dirty, it is assumed that a crash occured. At
57 this point an error occurs and the user is told to rebuild the file.
58 A rebuild scans the rows and rewrites the meta file. If corruption is found
59 in the data file then the meta file is not repaired.
61 At some point a recovery method for such a drastic case needs to be divised.
63 Locks are row level, and you will get a consistant read.
65 For performance as far as table scans go it is quite fast. I don't have
66 good numbers but locally it has out performed both Innodb and MyISAM. For
67 Innodb the question will be if the table can be fit into the buffer
68 pool. For MyISAM its a question of how much the file system caches the
69 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
70 doesn't have enough memory to cache entire table that archive turns out
73 Examples between MyISAM (packed) and Archive.
75 Table with 76695844 identical rows:
76 29680807 a_archive.ARZ
80 Table with 8991478 rows (all of Slashdot's comments):
81 1922964506 comment_archive.ARZ
82 2944970297 comment_text.MYD
86 Allow users to set compression level.
87 Allow adjustable block size.
88 Implement versioning, should be easy.
89 Allow for errors, find a way to mark bad rows.
90 Add optional feature so that rows can be flushed at interval (which will cause less
91 compression but may speed up ordered searches).
92 Checkpoint the meta file to allow for faster rebuilds.
93 Option to allow for dirty reads, this would lower the sync calls, which would make
94 inserts a lot faster, but would mean highly arbitrary reads.
99 /* Variables for archive share methods */
100 pthread_mutex_t archive_mutex
;
101 static HASH archive_open_tables
;
103 /* The file extension */
104 #define ARZ ".ARZ" // The data file
105 #define ARN ".ARN" // Files used during an optimize call
106 #define ARM ".ARM" // Meta file (deprecated)
111 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
112 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
114 /* Static declarations for handerton */
115 static handler
*archive_create_handler(handlerton
*hton
,
118 int archive_discover(handlerton
*hton
, THD
* thd
, const char *db
,
124 Number of rows that will force a bulk insert.
126 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
129 Size of header used for row
131 #define ARCHIVE_ROW_HEADER_SIZE 4
133 static handler
*archive_create_handler(handlerton
*hton
,
137 return new (mem_root
) ha_archive(hton
, table
);
141 Used for hash table that tracks open tables.
143 static uchar
* archive_get_key(ARCHIVE_SHARE
*share
, size_t *length
,
144 my_bool not_used
__attribute__((unused
)))
146 *length
=share
->table_name_length
;
147 return (uchar
*) share
->table_name
;
152 Initialize the archive handler.
163 int archive_db_init(void *p
)
165 DBUG_ENTER("archive_db_init");
166 handlerton
*archive_hton
;
168 archive_hton
= (handlerton
*)p
;
169 archive_hton
->state
= SHOW_OPTION_YES
;
170 archive_hton
->db_type
= DB_TYPE_ARCHIVE_DB
;
171 archive_hton
->create
= archive_create_handler
;
172 archive_hton
->flags
= HTON_NO_FLAGS
;
173 archive_hton
->discover
= archive_discover
;
175 if (pthread_mutex_init(&archive_mutex
, MY_MUTEX_INIT_FAST
))
177 if (hash_init(&archive_open_tables
, table_alias_charset
, 32, 0, 0,
178 (hash_get_key
) archive_get_key
, 0, 0))
180 VOID(pthread_mutex_destroy(&archive_mutex
));
191 Release the archive handler.
201 int archive_db_done(void *p
)
203 hash_free(&archive_open_tables
);
204 VOID(pthread_mutex_destroy(&archive_mutex
));
210 ha_archive::ha_archive(handlerton
*hton
, TABLE_SHARE
*table_arg
)
211 :handler(hton
, table_arg
), delayed_insert(0), bulk_insert(0)
213 /* Set our original buffer from pre-allocated memory */
214 buffer
.set((char *)byte_buffer
, IO_SIZE
, system_charset_info
);
216 /* The size of the offset value we will use for position() */
217 ref_length
= sizeof(my_off_t
);
218 archive_reader_open
= FALSE
;
221 int archive_discover(handlerton
*hton
, THD
* thd
, const char *db
,
226 DBUG_ENTER("archive_discover");
227 DBUG_PRINT("archive_discover", ("db: %s, name: %s", db
, name
));
228 azio_stream frm_stream
;
229 char az_file
[FN_REFLEN
];
233 fn_format(az_file
, name
, db
, ARZ
, MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
235 if (!(my_stat(az_file
, &file_stat
, MYF(0))))
238 if (!(azopen(&frm_stream
, az_file
, O_RDONLY
|O_BINARY
)))
240 if (errno
== EROFS
|| errno
== EACCES
)
241 DBUG_RETURN(my_errno
= errno
);
242 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
245 if (frm_stream
.frm_length
== 0)
248 frm_ptr
= (char *)my_malloc(sizeof(char) * frm_stream
.frm_length
, MYF(0));
249 azread_frm(&frm_stream
, frm_ptr
);
250 azclose(&frm_stream
);
252 *frmlen
= frm_stream
.frm_length
;
253 *frmblob
= (uchar
*) frm_ptr
;
262 This method reads the header of a datafile and returns whether or not it was successful.
264 int ha_archive::read_data_header(azio_stream
*file_to_read
)
268 uchar data_buffer
[DATA_BUFFER_SIZE
];
269 DBUG_ENTER("ha_archive::read_data_header");
271 if (azrewind(file_to_read
) == -1)
272 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
274 if (file_to_read
->version
>= 3)
276 /* Everything below this is just legacy to version 2< */
278 DBUG_PRINT("ha_archive", ("Reading legacy data header"));
280 ret
= azread(file_to_read
, data_buffer
, DATA_BUFFER_SIZE
, &error
);
282 if (ret
!= DATA_BUFFER_SIZE
)
284 DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
285 DATA_BUFFER_SIZE
, ret
));
291 DBUG_PRINT("ha_archive", ("Compression error (%d)", error
));
295 DBUG_PRINT("ha_archive", ("Check %u", data_buffer
[0]));
296 DBUG_PRINT("ha_archive", ("Version %u", data_buffer
[1]));
298 if ((data_buffer
[0] != (uchar
)ARCHIVE_CHECK_HEADER
) &&
299 (data_buffer
[1] != (uchar
)ARCHIVE_VERSION
))
300 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
307 We create the shared memory space that we will use for the open table.
308 No matter what we try to get or create a share. This is so that a repair
309 table operation can occur.
311 See ha_example.cc for a longer description.
313 ARCHIVE_SHARE
*ha_archive::get_share(const char *table_name
, int *rc
)
316 DBUG_ENTER("ha_archive::get_share");
318 pthread_mutex_lock(&archive_mutex
);
319 length
=(uint
) strlen(table_name
);
321 if (!(share
=(ARCHIVE_SHARE
*) hash_search(&archive_open_tables
,
326 azio_stream archive_tmp
;
328 if (!my_multi_malloc(MYF(MY_WME
| MY_ZEROFILL
),
329 &share
, sizeof(*share
),
333 pthread_mutex_unlock(&archive_mutex
);
334 *rc
= HA_ERR_OUT_OF_MEM
;
339 share
->table_name_length
= length
;
340 share
->table_name
= tmp_name
;
341 share
->crashed
= FALSE
;
342 share
->archive_write_open
= FALSE
;
343 fn_format(share
->data_file_name
, table_name
, "",
344 ARZ
, MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
345 strmov(share
->table_name
, table_name
);
346 DBUG_PRINT("ha_archive", ("Data File %s",
347 share
->data_file_name
));
349 We will use this lock for rows.
351 VOID(pthread_mutex_init(&share
->mutex
,MY_MUTEX_INIT_FAST
));
354 We read the meta file, but do not mark it dirty. Since we are not
355 doing a write we won't mark it dirty (and we won't open it for
356 anything but reading... open it for write and we will generate null
359 if (!(azopen(&archive_tmp
, share
->data_file_name
, O_RDONLY
|O_BINARY
)))
361 *rc
= my_errno
? my_errno
: -1;
362 pthread_mutex_unlock(&archive_mutex
);
363 my_free(share
, MYF(0));
366 stats
.auto_increment_value
= archive_tmp
.auto_increment
+ 1;
367 share
->rows_recorded
= (ha_rows
)archive_tmp
.rows
;
368 share
->crashed
= archive_tmp
.dirty
;
370 If archive version is less than 3, It should be upgraded before
373 if (archive_tmp
.version
< ARCHIVE_VERSION
)
374 *rc
= HA_ERR_TABLE_NEEDS_UPGRADE
;
375 azclose(&archive_tmp
);
377 VOID(my_hash_insert(&archive_open_tables
, (uchar
*) share
));
378 thr_lock_init(&share
->lock
);
381 DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now",
382 share
->table_name_length
, share
->table_name
,
385 *rc
= HA_ERR_CRASHED_ON_USAGE
;
386 pthread_mutex_unlock(&archive_mutex
);
394 See ha_example.cc for a description.
396 int ha_archive::free_share()
399 DBUG_ENTER("ha_archive::free_share");
400 DBUG_PRINT("ha_archive",
401 ("archive table %.*s has %d open handles on entrance",
402 share
->table_name_length
, share
->table_name
,
405 pthread_mutex_lock(&archive_mutex
);
406 if (!--share
->use_count
)
408 hash_delete(&archive_open_tables
, (uchar
*) share
);
409 thr_lock_delete(&share
->lock
);
410 VOID(pthread_mutex_destroy(&share
->mutex
));
412 We need to make sure we don't reset the crashed state.
413 If we open a crashed file, wee need to close it as crashed unless
414 it has been repaired.
415 Since we will close the data down after this, we go on and count
418 if (share
->archive_write_open
)
420 if (azclose(&(share
->archive_write
)))
423 my_free((uchar
*) share
, MYF(0));
425 pthread_mutex_unlock(&archive_mutex
);
430 int ha_archive::init_archive_writer()
432 DBUG_ENTER("ha_archive::init_archive_writer");
434 It is expensive to open and close the data files and since you can't have
435 a gzip file that can be both read and written we keep a writer open
436 that is shared amoung all open tables.
438 if (!(azopen(&(share
->archive_write
), share
->data_file_name
,
441 DBUG_PRINT("ha_archive", ("Could not open archive write file"));
442 share
->crashed
= TRUE
;
445 share
->archive_write_open
= TRUE
;
452 No locks are required because it is associated with just one handler instance
454 int ha_archive::init_archive_reader()
456 DBUG_ENTER("ha_archive::init_archive_reader");
458 It is expensive to open and close the data files and since you can't have
459 a gzip file that can be both read and written we keep a writer open
460 that is shared amoung all open tables.
462 if (!archive_reader_open
)
464 if (!(azopen(&archive
, share
->data_file_name
, O_RDONLY
|O_BINARY
)))
466 DBUG_PRINT("ha_archive", ("Could not open archive read file"));
467 share
->crashed
= TRUE
;
470 archive_reader_open
= TRUE
;
478 We just implement one additional file extension.
480 static const char *ha_archive_exts
[] = {
485 const char **ha_archive::bas_ext() const
487 return ha_archive_exts
;
492 When opening a file we:
493 Create/get our shared structure.
495 We open the file we will read from.
497 int ha_archive::open(const char *name
, int mode
, uint open_options
)
500 DBUG_ENTER("ha_archive::open");
502 DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
503 (open_options
& HA_OPEN_FOR_REPAIR
) ? "yes" : "no"));
504 share
= get_share(name
, &rc
);
507 Allow open on crashed table in repair mode only.
508 Block open on 5.0 ARCHIVE table. Though we have almost all
509 routines to access these tables, they were not well tested.
510 For now we have to refuse to open such table to avoid
517 case HA_ERR_CRASHED_ON_USAGE
:
518 if (open_options
& HA_OPEN_FOR_REPAIR
)
521 case HA_ERR_TABLE_NEEDS_UPGRADE
:
530 record_buffer
= create_record_buffer(table
->s
->reclength
+
531 ARCHIVE_ROW_HEADER_SIZE
);
536 DBUG_RETURN(HA_ERR_OUT_OF_MEM
);
539 thr_lock_data_init(&share
->lock
, &lock
, NULL
);
541 DBUG_PRINT("ha_archive", ("archive table was crashed %s",
542 rc
== HA_ERR_CRASHED_ON_USAGE
? "yes" : "no"));
543 if (rc
== HA_ERR_CRASHED_ON_USAGE
&& open_options
& HA_OPEN_FOR_REPAIR
)
560 We first close this storage engines file handle to the archive and
561 then remove our reference count to the table (and possibly free it
569 int ha_archive::close(void)
572 DBUG_ENTER("ha_archive::close");
574 destroy_record_buffer(record_buffer
);
576 /* First close stream */
577 if (archive_reader_open
)
579 if (azclose(&archive
))
582 /* then also close share */
590 We create our data file here. The format is pretty simple.
591 You can read about the format of the data file above.
592 Unlike other storage engines we do not "pack" our data. Since we
593 are about to do a general compression, packing would just be a waste of
594 CPU time. If the table has blobs they are written after the row in the order
598 int ha_archive::create(const char *name
, TABLE
*table_arg
,
599 HA_CREATE_INFO
*create_info
)
601 char name_buff
[FN_REFLEN
];
602 char linkname
[FN_REFLEN
];
604 azio_stream create_stream
; /* Archive file we are working with */
605 File frm_file
; /* File handler for readers */
606 MY_STAT file_stat
; // Stat information for the data file
609 DBUG_ENTER("ha_archive::create");
611 stats
.auto_increment_value
= create_info
->auto_increment_value
;
613 for (uint key
= 0; key
< table_arg
->s
->keys
; key
++)
615 KEY
*pos
= table_arg
->key_info
+key
;
616 KEY_PART_INFO
*key_part
= pos
->key_part
;
617 KEY_PART_INFO
*key_part_end
= key_part
+ pos
->key_parts
;
619 for (; key_part
!= key_part_end
; key_part
++)
621 Field
*field
= key_part
->field
;
623 if (!(field
->flags
& AUTO_INCREMENT_FLAG
))
626 DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
633 We reuse name_buff since it is available.
635 if (create_info
->data_file_name
&& create_info
->data_file_name
[0] != '#')
637 DBUG_PRINT("ha_archive", ("archive will create stream file %s",
638 create_info
->data_file_name
));
640 fn_format(name_buff
, create_info
->data_file_name
, "", ARZ
,
641 MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
642 fn_format(linkname
, name
, "", ARZ
,
643 MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
647 fn_format(name_buff
, name
, "", ARZ
,
648 MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
653 There is a chance that the file was "discovered". In this case
654 just use whatever file is there.
656 if (!(my_stat(name_buff
, &file_stat
, MYF(0))))
659 if (!(azopen(&create_stream
, name_buff
, O_CREAT
|O_RDWR
|O_BINARY
)))
666 my_symlink(name_buff
, linkname
, MYF(0));
667 fn_format(name_buff
, name
, "", ".frm",
668 MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
671 Here is where we open up the frm and pass it to archive to store
673 if ((frm_file
= my_open(name_buff
, O_RDONLY
, MYF(0))) > 0)
675 if (!my_fstat(frm_file
, &file_stat
, MYF(MY_WME
)))
677 frm_ptr
= (uchar
*)my_malloc(sizeof(uchar
) * file_stat
.st_size
, MYF(0));
680 my_read(frm_file
, frm_ptr
, file_stat
.st_size
, MYF(0));
681 azwrite_frm(&create_stream
, (char *)frm_ptr
, file_stat
.st_size
);
682 my_free((uchar
*)frm_ptr
, MYF(0));
685 my_close(frm_file
, MYF(0));
688 if (create_info
->comment
.str
)
689 azwrite_comment(&create_stream
, create_info
->comment
.str
,
690 create_info
->comment
.length
);
693 Yes you need to do this, because the starting value
694 for the autoincrement may not be zero.
696 create_stream
.auto_increment
= stats
.auto_increment_value
?
697 stats
.auto_increment_value
- 1 : 0;
698 if (azclose(&create_stream
))
707 DBUG_PRINT("ha_archive", ("Creating File %s", name_buff
));
708 DBUG_PRINT("ha_archive", ("Creating Link %s", linkname
));
716 /* Return error number, if we got one */
717 DBUG_RETURN(error
? error
: -1);
721 This is where the actual row is written out.
723 int ha_archive::real_write_row(uchar
*buf
, azio_stream
*writer
)
726 unsigned int r_pack_length
;
727 DBUG_ENTER("ha_archive::real_write_row");
729 /* We pack the row for writing */
730 r_pack_length
= pack_row(buf
);
732 written
= azwrite(writer
, record_buffer
->buffer
, r_pack_length
);
733 if (written
!= r_pack_length
)
735 DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
737 (uint32
)r_pack_length
));
741 if (!delayed_insert
|| !bulk_insert
)
749 Calculate max length needed for row. This includes
750 the bytes required for the length in the header.
753 uint32
ha_archive::max_row_length(const uchar
*buf
)
755 uint32 length
= (uint32
)(table
->s
->reclength
+ table
->s
->fields
*2);
756 length
+= ARCHIVE_ROW_HEADER_SIZE
;
759 for (ptr
= table
->s
->blob_field
, end
=ptr
+ table
->s
->blob_fields
;
763 if (!table
->field
[*ptr
]->is_null())
764 length
+= 2 + ((Field_blob
*)table
->field
[*ptr
])->get_length();
771 unsigned int ha_archive::pack_row(uchar
*record
)
775 DBUG_ENTER("ha_archive::pack_row");
778 if (fix_rec_buff(max_row_length(record
)))
779 DBUG_RETURN(HA_ERR_OUT_OF_MEM
); /* purecov: inspected */
782 memcpy(record_buffer
->buffer
+ARCHIVE_ROW_HEADER_SIZE
,
783 record
, table
->s
->null_bytes
);
784 ptr
= record_buffer
->buffer
+ table
->s
->null_bytes
+ ARCHIVE_ROW_HEADER_SIZE
;
786 for (Field
**field
=table
->field
; *field
; field
++)
788 if (!((*field
)->is_null()))
789 ptr
= (*field
)->pack(ptr
, record
+ (*field
)->offset(record
));
792 int4store(record_buffer
->buffer
, (int)(ptr
- record_buffer
->buffer
-
793 ARCHIVE_ROW_HEADER_SIZE
));
794 DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
795 (ptr
- record_buffer
->buffer
-
796 ARCHIVE_ROW_HEADER_SIZE
)));
798 DBUG_RETURN((unsigned int) (ptr
- record_buffer
->buffer
));
803 Look at ha_archive::open() for an explanation of the row format.
804 Here we just write out the row.
806 Wondering about start_bulk_insert()? We don't implement it for
807 archive since it optimizes for lots of writes. The only save
808 for implementing start_bulk_insert() is that we could skip
809 setting dirty to true each time.
811 int ha_archive::write_row(uchar
*buf
)
814 uchar
*read_buf
= NULL
;
816 uchar
*record
= table
->record
[0];
817 DBUG_ENTER("ha_archive::write_row");
820 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
822 ha_statistic_increment(&SSV::ha_write_count
);
823 if (table
->timestamp_field_type
& TIMESTAMP_AUTO_SET_ON_INSERT
)
824 table
->timestamp_field
->set_time();
825 pthread_mutex_lock(&share
->mutex
);
827 if (!share
->archive_write_open
)
828 if (init_archive_writer())
829 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
832 if (table
->next_number_field
&& record
== table
->record
[0])
834 KEY
*mkey
= &table
->s
->key_info
[0]; // We only support one key right now
835 update_auto_increment();
836 temp_auto
= table
->next_number_field
->val_int();
839 We don't support decremening auto_increment. They make the performance
842 if (temp_auto
<= share
->archive_write
.auto_increment
&&
843 mkey
->flags
& HA_NOSAME
)
845 rc
= HA_ERR_FOUND_DUPP_KEY
;
850 Bad news, this will cause a search for the unique value which is very
851 expensive since we will have to do a table scan which will lock up
852 all other writers during this period. This could perhaps be optimized
857 First we create a buffer that we can use for reading rows, and can pass
860 if (!(read_buf
= (uchar
*) my_malloc(table
->s
->reclength
, MYF(MY_WME
))))
862 rc
= HA_ERR_OUT_OF_MEM
;
866 All of the buffer must be written out or we won't see all of the
869 azflush(&(share
->archive_write
), Z_SYNC_FLUSH
);
871 Set the position of the local read thread to the beginning position.
873 if (read_data_header(&archive
))
875 rc
= HA_ERR_CRASHED_ON_USAGE
;
879 Field
*mfield
= table
->next_number_field
;
881 while (!(get_row(&archive
, read_buf
)))
883 if (!memcmp(read_buf
+ mfield
->offset(record
),
884 table
->next_number_field
->ptr
,
885 mfield
->max_display_length()))
887 rc
= HA_ERR_FOUND_DUPP_KEY
;
895 if (temp_auto
> share
->archive_write
.auto_increment
)
896 stats
.auto_increment_value
=
897 (share
->archive_write
.auto_increment
= temp_auto
) + 1;
902 Notice that the global auto_increment has been increased.
903 In case of a failed row write, we will never try to reuse the value.
905 share
->rows_recorded
++;
906 rc
= real_write_row(buf
, &(share
->archive_write
));
908 pthread_mutex_unlock(&share
->mutex
);
910 my_free((uchar
*) read_buf
, MYF(0));
916 void ha_archive::get_auto_increment(ulonglong offset
, ulonglong increment
,
917 ulonglong nb_desired_values
,
918 ulonglong
*first_value
,
919 ulonglong
*nb_reserved_values
)
921 *nb_reserved_values
= ULONGLONG_MAX
;
922 *first_value
= share
->archive_write
.auto_increment
+ 1;
925 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
926 int ha_archive::index_init(uint keynr
, bool sorted
)
928 DBUG_ENTER("ha_archive::index_init");
935 No indexes, so if we get a request for an index search since we tell
936 the optimizer that we have unique indexes, we scan
938 int ha_archive::index_read(uchar
*buf
, const uchar
*key
,
939 uint key_len
, enum ha_rkey_function find_flag
)
942 DBUG_ENTER("ha_archive::index_read");
943 rc
= index_read_idx(buf
, active_index
, key
, key_len
, find_flag
);
948 int ha_archive::index_read_idx(uchar
*buf
, uint index
, const uchar
*key
,
949 uint key_len
, enum ha_rkey_function find_flag
)
953 KEY
*mkey
= &table
->s
->key_info
[index
];
954 current_k_offset
= mkey
->key_part
->offset
;
956 current_key_len
= key_len
;
959 DBUG_ENTER("ha_archive::index_read_idx");
966 while (!(get_row(&archive
, buf
)))
968 if (!memcmp(current_key
, buf
+ current_k_offset
, current_key_len
))
979 DBUG_RETURN(rc
? rc
: HA_ERR_END_OF_FILE
);
983 int ha_archive::index_next(uchar
* buf
)
987 DBUG_ENTER("ha_archive::index_next");
989 while (!(get_row(&archive
, buf
)))
991 if (!memcmp(current_key
, buf
+current_k_offset
, current_key_len
))
998 DBUG_RETURN(found
? 0 : HA_ERR_END_OF_FILE
);
1002 All calls that need to scan the table start with this method. If we are told
1003 that it is a table scan we rewind the file to the beginning, otherwise
1004 we assume the position will be set.
1007 int ha_archive::rnd_init(bool scan
)
1009 DBUG_ENTER("ha_archive::rnd_init");
1012 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1014 init_archive_reader();
1016 /* We rewind the file so that we can read from the beginning if scan */
1019 scan_rows
= stats
.records
;
1020 DBUG_PRINT("info", ("archive will retrieve %llu rows",
1021 (unsigned long long) scan_rows
));
1023 if (read_data_header(&archive
))
1024 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1032 This is the method that is used to read a row. It assumes that the row is
1033 positioned where you want it.
1035 int ha_archive::get_row(azio_stream
*file_to_read
, uchar
*buf
)
1038 DBUG_ENTER("ha_archive::get_row");
1039 DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1040 (uchar
)file_to_read
->version
,
1042 if (file_to_read
->version
== ARCHIVE_VERSION
)
1043 rc
= get_row_version3(file_to_read
, buf
);
1045 rc
= get_row_version2(file_to_read
, buf
);
1047 DBUG_PRINT("ha_archive", ("Return %d\n", rc
));
1052 /* Reallocate buffer if needed */
1053 bool ha_archive::fix_rec_buff(unsigned int length
)
1055 DBUG_ENTER("ha_archive::fix_rec_buff");
1056 DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1057 length
, record_buffer
->length
));
1058 DBUG_ASSERT(record_buffer
->buffer
);
1060 if (length
> record_buffer
->length
)
1063 if (!(newptr
=(uchar
*) my_realloc((uchar
*) record_buffer
->buffer
,
1065 MYF(MY_ALLOW_ZERO_PTR
))))
1067 record_buffer
->buffer
= newptr
;
1068 record_buffer
->length
= length
;
1071 DBUG_ASSERT(length
<= record_buffer
->length
);
1076 int ha_archive::unpack_row(azio_stream
*file_to_read
, uchar
*record
)
1078 DBUG_ENTER("ha_archive::unpack_row");
1082 uchar size_buffer
[ARCHIVE_ROW_HEADER_SIZE
];
1083 unsigned int row_len
;
1085 /* First we grab the length stored */
1086 read
= azread(file_to_read
, size_buffer
, ARCHIVE_ROW_HEADER_SIZE
, &error
);
1088 if (error
== Z_STREAM_ERROR
|| (read
&& read
< ARCHIVE_ROW_HEADER_SIZE
))
1089 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1091 /* If we read nothing we are at the end of the file */
1092 if (read
== 0 || read
!= ARCHIVE_ROW_HEADER_SIZE
)
1093 DBUG_RETURN(HA_ERR_END_OF_FILE
);
1095 row_len
= uint4korr(size_buffer
);
1096 DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len
,
1097 (unsigned int)table
->s
->reclength
));
1099 if (fix_rec_buff(row_len
))
1101 DBUG_RETURN(HA_ERR_OUT_OF_MEM
);
1103 DBUG_ASSERT(row_len
<= record_buffer
->length
);
1105 read
= azread(file_to_read
, record_buffer
->buffer
, row_len
, &error
);
1107 if (read
!= row_len
|| error
)
1109 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1112 /* Copy null bits */
1113 const uchar
*ptr
= record_buffer
->buffer
;
1115 Field::unpack() is not called when field is NULL. For VARCHAR
1116 Field::unpack() only unpacks as much bytes as occupied by field
1117 value. In these cases respective memory area on record buffer is
1120 These uninitialized areas may be accessed by CHECKSUM TABLE or
1121 by optimizer using temporary table (BUG#12997905). We may remove
1122 this memset() when they're fixed.
1124 memset(record
, 0, table
->s
->reclength
);
1125 memcpy(record
, ptr
, table
->s
->null_bytes
);
1126 ptr
+= table
->s
->null_bytes
;
1127 for (Field
**field
=table
->field
; *field
; field
++)
1129 if (!((*field
)->is_null_in_record(record
)))
1131 ptr
= (*field
)->unpack(record
+ (*field
)->offset(table
->record
[0]), ptr
);
1138 int ha_archive::get_row_version3(azio_stream
*file_to_read
, uchar
*buf
)
1140 DBUG_ENTER("ha_archive::get_row_version3");
1142 int returnable
= unpack_row(file_to_read
, buf
);
1144 DBUG_RETURN(returnable
);
1148 int ha_archive::get_row_version2(azio_stream
*file_to_read
, uchar
*buf
)
1154 size_t total_blob_length
= 0;
1155 MY_BITMAP
*read_set
= table
->read_set
;
1156 DBUG_ENTER("ha_archive::get_row_version2");
1158 read
= azread(file_to_read
, (voidp
)buf
, table
->s
->reclength
, &error
);
1160 /* If we read nothing we are at the end of the file */
1162 DBUG_RETURN(HA_ERR_END_OF_FILE
);
1164 if (read
!= table
->s
->reclength
)
1166 DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1168 (unsigned int)table
->s
->reclength
));
1169 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1172 if (error
== Z_STREAM_ERROR
|| error
== Z_DATA_ERROR
)
1173 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1176 If the record is the wrong size, the file is probably damaged, unless
1177 we are dealing with a delayed insert or a bulk insert.
1179 if ((ulong
) read
!= table
->s
->reclength
)
1180 DBUG_RETURN(HA_ERR_END_OF_FILE
);
1182 /* Calculate blob length, we use this for our buffer */
1183 for (ptr
= table
->s
->blob_field
, end
=ptr
+ table
->s
->blob_fields
;
1187 if (bitmap_is_set(read_set
,
1188 (((Field_blob
*) table
->field
[*ptr
])->field_index
)))
1189 total_blob_length
+= ((Field_blob
*) table
->field
[*ptr
])->get_length();
1192 /* Adjust our row buffer if we need be */
1193 buffer
.alloc(total_blob_length
);
1194 last
= (char *)buffer
.ptr();
1196 /* Loop through our blobs and read them */
1197 for (ptr
= table
->s
->blob_field
, end
=ptr
+ table
->s
->blob_fields
;
1201 size_t size
= ((Field_blob
*) table
->field
[*ptr
])->get_length();
1204 if (bitmap_is_set(read_set
,
1205 ((Field_blob
*) table
->field
[*ptr
])->field_index
))
1207 read
= azread(file_to_read
, last
, size
, &error
);
1210 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1212 if ((size_t) read
!= size
)
1213 DBUG_RETURN(HA_ERR_END_OF_FILE
);
1214 ((Field_blob
*) table
->field
[*ptr
])->set_ptr(size
, (uchar
*) last
);
1219 (void)azseek(file_to_read
, size
, SEEK_CUR
);
1228 Called during ORDER BY. Its position is either from being called sequentially
1229 or by having had ha_archive::rnd_pos() called before it is called.
1232 int ha_archive::rnd_next(uchar
*buf
)
1235 DBUG_ENTER("ha_archive::rnd_next");
1238 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1241 DBUG_RETURN(HA_ERR_END_OF_FILE
);
1244 ha_statistic_increment(&SSV::ha_read_rnd_next_count
);
1245 current_position
= aztell(&archive
);
1246 rc
= get_row(&archive
, buf
);
1248 table
->status
=rc
? STATUS_NOT_FOUND
: 0;
1255 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1256 each call to ha_archive::rnd_next() if an ordering of the rows is
1260 void ha_archive::position(const uchar
*record
)
1262 DBUG_ENTER("ha_archive::position");
1263 my_store_ptr(ref
, ref_length
, current_position
);
1269 This is called after a table scan for each row if the results of the
1270 scan need to be ordered. It will take *pos and use it to move the
1271 cursor in the file so that the next row that is called is the
1272 correctly ordered row.
1275 int ha_archive::rnd_pos(uchar
* buf
, uchar
*pos
)
1277 DBUG_ENTER("ha_archive::rnd_pos");
1278 ha_statistic_increment(&SSV::ha_read_rnd_next_count
);
1279 current_position
= (my_off_t
)my_get_ptr(pos
, ref_length
);
1280 if (azseek(&archive
, current_position
, SEEK_SET
) == (my_off_t
)(-1L))
1281 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1282 DBUG_RETURN(get_row(&archive
, buf
));
1286 This method repairs the meta file. It does this by walking the datafile and
1287 rewriting the meta file. If EXTENDED repair is requested, we attempt to
1288 recover as much data as possible.
1290 int ha_archive::repair(THD
* thd
, HA_CHECK_OPT
* check_opt
)
1292 DBUG_ENTER("ha_archive::repair");
1293 int rc
= optimize(thd
, check_opt
);
1296 DBUG_RETURN(HA_ADMIN_CORRUPT
);
1298 share
->crashed
= FALSE
;
1303 The table can become fragmented if data was inserted, read, and then
1304 inserted again. What we do is open up the file and recompress it completely.
1306 int ha_archive::optimize(THD
* thd
, HA_CHECK_OPT
* check_opt
)
1308 DBUG_ENTER("ha_archive::optimize");
1311 char writer_filename
[FN_REFLEN
];
1313 pthread_mutex_lock(&share
->mutex
);
1314 init_archive_reader();
1316 // now we close both our writer and our reader for the rename
1317 if (share
->archive_write_open
)
1319 azclose(&(share
->archive_write
));
1320 share
->archive_write_open
= FALSE
;
1323 /* Lets create a file to contain the new data */
1324 fn_format(writer_filename
, share
->table_name
, "", ARN
,
1325 MY_REPLACE_EXT
| MY_UNPACK_FILENAME
);
1327 if (!(azopen(&writer
, writer_filename
, O_CREAT
|O_RDWR
|O_BINARY
)))
1329 pthread_mutex_unlock(&share
->mutex
);
1330 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE
);
1334 An extended rebuild is a lot more effort. We open up each row and re-record it.
1335 Any dead rows are removed (aka rows that may have been partially recorded).
1337 As of Archive format 3, this is the only type that is performed, before this
1338 version it was just done on T_EXTEND
1342 DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1345 Now we will rewind the archive file so that we are positioned at the
1348 rc
= read_data_header(&archive
);
1351 On success of writing out the new header, we now fetch each row and
1352 insert it into the new archive file.
1356 share
->rows_recorded
= 0;
1357 stats
.auto_increment_value
= 1;
1358 share
->archive_write
.auto_increment
= 0;
1359 my_bitmap_map
*org_bitmap
= dbug_tmp_use_all_columns(table
, table
->read_set
);
1361 while (!(rc
= get_row(&archive
, table
->record
[0])))
1363 real_write_row(table
->record
[0], &writer
);
1365 Long term it should be possible to optimize this so that
1366 it is not called on each row.
1368 if (table
->found_next_number_field
)
1370 Field
*field
= table
->found_next_number_field
;
1371 ulonglong auto_value
=
1372 (ulonglong
) field
->val_int(table
->record
[0] +
1373 field
->offset(table
->record
[0]));
1374 if (share
->archive_write
.auto_increment
< auto_value
)
1375 stats
.auto_increment_value
=
1376 (share
->archive_write
.auto_increment
= auto_value
) + 1;
1380 dbug_tmp_restore_column_map(table
->read_set
, org_bitmap
);
1381 share
->rows_recorded
= (ha_rows
)writer
.rows
;
1384 DBUG_PRINT("info", ("recovered %llu archive rows",
1385 (unsigned long long)share
->rows_recorded
));
1387 DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1388 (unsigned long long)share
->rows_recorded
));
1391 If REPAIR ... EXTENDED is requested, try to recover as much data
1392 from data file as possible. In this case if we failed to read a
1393 record, we assume EOF. This allows massive data loss, but we can
1394 hardly do more with broken zlib stream. And this is the only way
1395 to restore at least what is still recoverable.
1397 if (rc
&& rc
!= HA_ERR_END_OF_FILE
&& !(check_opt
->flags
& T_EXTEND
))
1402 share
->dirty
= FALSE
;
1406 // make the file we just wrote be our data file
1407 rc
= my_rename(writer_filename
,share
->data_file_name
,MYF(0));
1410 pthread_mutex_unlock(&share
->mutex
);
1413 DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc
));
1415 pthread_mutex_unlock(&share
->mutex
);
1421 Below is an example of how to setup row level locking.
1423 THR_LOCK_DATA
**ha_archive::store_lock(THD
*thd
,
1425 enum thr_lock_type lock_type
)
1427 if (lock_type
== TL_WRITE_DELAYED
)
1428 delayed_insert
= TRUE
;
1430 delayed_insert
= FALSE
;
1432 if (lock_type
!= TL_IGNORE
&& lock
.type
== TL_UNLOCK
)
1435 Here is where we get into the guts of a row level lock.
1437 If we are not doing a LOCK TABLE or DISCARD/IMPORT
1438 TABLESPACE, then allow multiple writers
1441 if ((lock_type
>= TL_WRITE_CONCURRENT_INSERT
&&
1442 lock_type
<= TL_WRITE
) && !thd_in_lock_tables(thd
)
1443 && !thd_tablespace_op(thd
))
1444 lock_type
= TL_WRITE_ALLOW_WRITE
;
1447 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1448 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1449 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1450 to t2. Convert the lock to a normal read lock to allow
1451 concurrent inserts to t2.
1454 if (lock_type
== TL_READ_NO_INSERT
&& !thd_in_lock_tables(thd
))
1455 lock_type
= TL_READ
;
1457 lock
.type
=lock_type
;
1465 void ha_archive::update_create_info(HA_CREATE_INFO
*create_info
)
1467 DBUG_ENTER("ha_archive::update_create_info");
1469 ha_archive::info(HA_STATUS_AUTO
);
1470 if (!(create_info
->used_fields
& HA_CREATE_USED_AUTO
))
1472 create_info
->auto_increment_value
= stats
.auto_increment_value
;
1475 if (!(my_readlink(share
->real_path
, share
->data_file_name
, MYF(0))))
1476 create_info
->data_file_name
= share
->real_path
;
1483 Hints for optimizer, see ha_tina for more information
1485 int ha_archive::info(uint flag
)
1487 DBUG_ENTER("ha_archive::info");
1490 If dirty, we lock, and then reset/flush the data.
1491 I found that just calling azflush() doesn't always work.
1493 pthread_mutex_lock(&share
->mutex
);
1494 if (share
->dirty
== TRUE
)
1496 if (share
->dirty
== TRUE
)
1498 DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1499 azflush(&(share
->archive_write
), Z_SYNC_FLUSH
);
1500 share
->dirty
= FALSE
;
1505 This should be an accurate number now, though bulk and delayed inserts can
1506 cause the number to be inaccurate.
1508 stats
.records
= share
->rows_recorded
;
1509 pthread_mutex_unlock(&share
->mutex
);
1513 DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats
.records
));
1514 /* Costs quite a bit more to get all information */
1515 if (flag
& HA_STATUS_TIME
)
1517 MY_STAT file_stat
; // Stat information for the data file
1519 VOID(my_stat(share
->data_file_name
, &file_stat
, MYF(MY_WME
)));
1521 stats
.data_file_length
= file_stat
.st_size
;
1522 stats
.create_time
= (ulong
) file_stat
.st_ctime
;
1523 stats
.update_time
= (ulong
) file_stat
.st_mtime
;
1524 stats
.mean_rec_length
= stats
.records
?
1525 ulong(stats
.data_file_length
/ stats
.records
) : table
->s
->reclength
;
1526 stats
.max_data_file_length
= MAX_FILE_SIZE
;
1528 stats
.delete_length
= 0;
1529 stats
.index_file_length
=0;
1531 if (flag
& HA_STATUS_AUTO
)
1533 init_archive_reader();
1534 pthread_mutex_lock(&share
->mutex
);
1535 azflush(&archive
, Z_SYNC_FLUSH
);
1536 pthread_mutex_unlock(&share
->mutex
);
1537 stats
.auto_increment_value
= archive
.auto_increment
+ 1;
1545 This method tells us that a bulk insert operation is about to occur. We set
1546 a flag which will keep write_row from saying that its data is dirty. This in
1547 turn will keep selects from causing a sync to occur.
1548 Basically, yet another optimizations to keep compression working well.
1550 void ha_archive::start_bulk_insert(ha_rows rows
)
1552 DBUG_ENTER("ha_archive::start_bulk_insert");
1553 if (!rows
|| rows
>= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT
)
1560 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1561 flag, and set the share dirty so that the next select will call sync for us.
1563 int ha_archive::end_bulk_insert()
1565 DBUG_ENTER("ha_archive::end_bulk_insert");
1572 We cancel a truncate command. The only way to delete an archive table is to drop it.
1573 This is done for security reasons. In a later version we will enable this by
1574 allowing the user to select a different row format.
1576 int ha_archive::delete_all_rows()
1578 DBUG_ENTER("ha_archive::delete_all_rows");
1579 DBUG_RETURN(HA_ERR_WRONG_COMMAND
);
1583 We just return state if asked.
1585 bool ha_archive::is_crashed() const
1587 DBUG_ENTER("ha_archive::is_crashed");
1588 DBUG_RETURN(share
->crashed
);
1592 Simple scan of the tables to make sure everything is ok.
1595 int ha_archive::check(THD
* thd
, HA_CHECK_OPT
* check_opt
)
1598 const char *old_proc_info
;
1600 DBUG_ENTER("ha_archive::check");
1602 old_proc_info
= thd_proc_info(thd
, "Checking table");
1603 pthread_mutex_lock(&share
->mutex
);
1604 count
= share
->rows_recorded
;
1605 /* Flush any waiting data */
1606 if (share
->archive_write_open
)
1607 azflush(&(share
->archive_write
), Z_SYNC_FLUSH
);
1608 pthread_mutex_unlock(&share
->mutex
);
1610 if (init_archive_reader())
1611 DBUG_RETURN(HA_ADMIN_CORRUPT
);
1613 Now we will rewind the archive file so that we are positioned at the
1616 read_data_header(&archive
);
1617 for (ha_rows cur_count
= count
; cur_count
; cur_count
--)
1619 if ((rc
= get_row(&archive
, table
->record
[0])))
1623 Now read records that may have been inserted concurrently.
1624 Acquire share->mutex so tail of the table is not modified by
1627 pthread_mutex_lock(&share
->mutex
);
1628 count
= share
->rows_recorded
- count
;
1629 if (share
->archive_write_open
)
1630 azflush(&(share
->archive_write
), Z_SYNC_FLUSH
);
1631 while (!(rc
= get_row(&archive
, table
->record
[0])))
1633 pthread_mutex_unlock(&share
->mutex
);
1635 if ((rc
&& rc
!= HA_ERR_END_OF_FILE
) || count
)
1638 thd_proc_info(thd
, old_proc_info
);
1639 DBUG_RETURN(HA_ADMIN_OK
);
1642 thd_proc_info(thd
, old_proc_info
);
1643 share
->crashed
= FALSE
;
1644 DBUG_RETURN(HA_ADMIN_CORRUPT
);
1648 Check and repair the table if needed.
1650 bool ha_archive::check_and_repair(THD
*thd
)
1652 HA_CHECK_OPT check_opt
;
1653 DBUG_ENTER("ha_archive::check_and_repair");
1657 DBUG_RETURN(repair(thd
, &check_opt
));
1660 archive_record_buffer
*ha_archive::create_record_buffer(unsigned int length
)
1662 DBUG_ENTER("ha_archive::create_record_buffer");
1663 archive_record_buffer
*r
;
1665 (archive_record_buffer
*) my_malloc(sizeof(archive_record_buffer
),
1668 DBUG_RETURN(NULL
); /* purecov: inspected */
1670 r
->length
= (int)length
;
1672 if (!(r
->buffer
= (uchar
*) my_malloc(r
->length
,
1675 my_free((char*) r
, MYF(MY_ALLOW_ZERO_PTR
));
1676 DBUG_RETURN(NULL
); /* purecov: inspected */
1682 void ha_archive::destroy_record_buffer(archive_record_buffer
*r
)
1684 DBUG_ENTER("ha_archive::destroy_record_buffer");
1685 my_free((char*) r
->buffer
, MYF(MY_ALLOW_ZERO_PTR
));
1686 my_free((char*) r
, MYF(MY_ALLOW_ZERO_PTR
));
1690 struct st_mysql_storage_engine archive_storage_engine
=
1691 { MYSQL_HANDLERTON_INTERFACE_VERSION
};
1693 mysql_declare_plugin(archive
)
1695 MYSQL_STORAGE_ENGINE_PLUGIN
,
1696 &archive_storage_engine
,
1698 "Brian Aker, MySQL AB",
1699 "Archive storage engine",
1701 archive_db_init
, /* Plugin Init */
1702 archive_db_done
, /* Plugin Deinit */
1704 NULL
, /* status variables */
1705 NULL
, /* system variables */
1706 NULL
/* config options */
1708 mysql_declare_plugin_end
;