2 Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 Cashing of files with only does (sequential) read or writes of fixed-
20 length records. A read isn't allowed to go over file-length. A read is ok
21 if it ends at file-length and next read can try to read after file-length
22 (and get a EOF-error).
23 Possibly use of asyncronic io.
24 macros for read and writes for faster io.
25 Used instead of FILE when reading or writing whole files.
26 This code makes mf_rec_cache obsolete (currently only used by ISAM)
27 One can change info->pos_in_file to a higher value to skip bytes in file if
28 also info->read_pos is set to info->read_end.
29 If called through open_cached_file(), then the temporary file will
30 only be created if a write exeeds the file buffer or if one calls
31 my_b_flush_io_cache().
33 If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
34 reading and another for writing. Reads are first done from disk and
35 then done from the write buffer. This is an efficient way to read
36 from a log file when one is writing to it at the same time.
37 For this to work, the file has to be opened in append mode!
38 Note that when one uses SEQ_READ_APPEND, one MUST write using
39 my_b_append ! This is needed because we need to lock the mutex
40 every time we access the write buffer.
43 When one SEQ_READ_APPEND and we are reading and writing at the same time,
44 each time the write buffer gets full and it's written to disk, we will
45 always do a disk read to read a part of the buffer from disk to the
47 This should be fixed so that when we do a my_b_flush_io_cache() and
48 we have been reading the write buffer, we should transfer the rest of the
49 write buffer to the read buffer before we start to reuse it.
52 #define MAP_TO_USE_RAID
53 #include "mysys_priv.h"
56 #include "mysys_err.h"
57 static void my_aiowait(my_aio_result
*result
);
62 #define lock_append_buffer(info) \
63 pthread_mutex_lock(&(info)->append_buffer_lock)
64 #define unlock_append_buffer(info) \
65 pthread_mutex_unlock(&(info)->append_buffer_lock)
67 #define lock_append_buffer(info)
68 #define unlock_append_buffer(info)
71 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
72 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
75 Setup internal pointers inside IO_CACHE
82 This is called on automaticly on init or reinit of IO_CACHE
83 It must be called externally if one moves or copies an IO_CACHE
87 void setup_io_cache(IO_CACHE
* info
)
89 /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
90 if (info
->type
== WRITE_CACHE
)
92 info
->current_pos
= &info
->write_pos
;
93 info
->current_end
= &info
->write_end
;
97 info
->current_pos
= &info
->read_pos
;
98 info
->current_end
= &info
->read_end
;
104 init_functions(IO_CACHE
* info
)
106 enum cache_type type
= info
->type
;
110 Must be initialized by the caller. The problem is that
111 _my_b_net_read has to be defined in sql directory because of
112 the dependency on THD, and therefore cannot be visible to
113 programs that link against mysys but know nothing about THD, such
117 case SEQ_READ_APPEND
:
118 info
->read_function
= _my_b_seq_read
;
119 info
->write_function
= 0; /* Force a core if used */
122 info
->read_function
=
124 info
->share
? _my_b_read_r
:
127 info
->write_function
= _my_b_write
;
130 setup_io_cache(info
);
135 Initialize an IO_CACHE object
139 info cache handler to initialize
140 file File that should be associated to to the handler
141 If == -1 then real_open_cached_file()
142 will be called when it's time to open file.
143 cachesize Size of buffer to allocate for read/write
144 If == 0 then use my_default_record_cache_size
146 seek_offset Where cache should start reading/writing
147 use_async_io Set to 1 of we should use async_io (if avaiable)
148 cache_myflags Bitmap of differnt flags
149 MY_WME | MY_FAE | MY_NABP | MY_FNABP |
150 MY_DONT_CHECK_FILESIZE
157 int init_io_cache(IO_CACHE
*info
, File file
, size_t cachesize
,
158 enum cache_type type
, my_off_t seek_offset
,
159 pbool use_async_io
, myf cache_myflags
)
163 my_off_t end_of_file
= ~(my_off_t
) 0;
164 DBUG_ENTER("init_io_cache");
165 DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
166 (ulong
) info
, (int) type
, (ulong
) seek_offset
));
169 info
->type
= TYPE_NOT_SET
; /* Don't set it until mutex are created */
170 info
->pos_in_file
= seek_offset
;
171 info
->pre_close
= info
->pre_read
= info
->post_read
= 0;
173 info
->alloced_buffer
= 0;
175 info
->seek_not_done
= 0;
179 pos
= my_tell(file
, MYF(0));
180 if ((pos
== (my_off_t
) -1) && (my_errno
== ESPIPE
))
183 This kind of object doesn't support seek() or tell(). Don't set a
184 flag that will make us again try to seek() later and fail.
186 info
->seek_not_done
= 0;
188 Additionally, if we're supposed to start somewhere other than the
189 the beginning of whatever this file is, then somebody made a bad
192 DBUG_ASSERT(seek_offset
== 0);
195 info
->seek_not_done
= test(seek_offset
!= pos
);
198 info
->disk_writes
= 0;
203 if (!cachesize
&& !(cachesize
= my_default_record_cache_size
))
204 DBUG_RETURN(1); /* No cache requested */
205 min_cache
=use_async_io
? IO_SIZE
*4 : IO_SIZE
*2;
206 if (type
== READ_CACHE
|| type
== SEQ_READ_APPEND
)
207 { /* Assume file isn't growing */
208 if (!(cache_myflags
& MY_DONT_CHECK_FILESIZE
))
210 /* Calculate end of file to avoid allocating oversized buffers */
211 end_of_file
=my_seek(file
,0L,MY_SEEK_END
,MYF(0));
212 /* Need to reset seek_not_done now that we just did a seek. */
213 info
->seek_not_done
= end_of_file
== seek_offset
? 0 : 1;
214 if (end_of_file
< seek_offset
)
215 end_of_file
=seek_offset
;
216 /* Trim cache size if the file is very small */
217 if ((my_off_t
) cachesize
> end_of_file
-seek_offset
+IO_SIZE
*2-1)
219 cachesize
= (size_t) (end_of_file
-seek_offset
)+IO_SIZE
*2-1;
220 use_async_io
=0; /* No need to use async */
224 cache_myflags
&= ~MY_DONT_CHECK_FILESIZE
;
225 if (type
!= READ_NET
&& type
!= WRITE_NET
)
227 /* Retry allocating memory in smaller blocks until we get one */
228 cachesize
= ((cachesize
+ min_cache
-1) & ~(min_cache
-1));
233 Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
236 myf flags
= (myf
) (cache_myflags
& ~(MY_WME
| MY_WAIT_IF_FULL
));
238 if (cachesize
< min_cache
)
239 cachesize
= min_cache
;
240 buffer_block
= cachesize
;
241 if (type
== SEQ_READ_APPEND
)
243 if (cachesize
== min_cache
)
244 flags
|= (myf
) MY_WME
;
246 if ((info
->buffer
= (uchar
*) my_malloc(buffer_block
, flags
)) != 0)
248 info
->write_buffer
=info
->buffer
;
249 if (type
== SEQ_READ_APPEND
)
250 info
->write_buffer
= info
->buffer
+ cachesize
;
251 info
->alloced_buffer
=1;
252 break; /* Enough memory found */
254 if (cachesize
== min_cache
)
255 DBUG_RETURN(2); /* Can't alloc cache */
256 /* Try with less memory */
257 cachesize
= (cachesize
*3/4 & ~(min_cache
-1));
261 DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong
) cachesize
));
262 info
->read_length
=info
->buffer_length
=cachesize
;
263 info
->myflags
=cache_myflags
& ~(MY_NABP
| MY_FNABP
);
264 info
->request_pos
= info
->read_pos
= info
->write_pos
= info
->buffer
;
265 if (type
== SEQ_READ_APPEND
)
267 info
->append_read_pos
= info
->write_pos
= info
->write_buffer
;
268 info
->write_end
= info
->write_buffer
+ info
->buffer_length
;
270 pthread_mutex_init(&info
->append_buffer_lock
,MY_MUTEX_INIT_FAST
);
273 #if defined(SAFE_MUTEX) && defined(THREAD)
276 /* Clear mutex so that safe_mutex will notice that it's not initialized */
277 bzero((char*) &info
->append_buffer_lock
, sizeof(info
));
281 if (type
== WRITE_CACHE
)
283 info
->buffer
+info
->buffer_length
- (seek_offset
& (IO_SIZE
-1));
285 info
->read_end
=info
->buffer
; /* Nothing in cache */
287 /* End_of_file may be changed by user later */
288 info
->end_of_file
= end_of_file
;
291 init_functions(info
);
293 if (use_async_io
&& ! my_disable_async_io
)
295 DBUG_PRINT("info",("Using async io"));
296 info
->read_length
/=2;
297 info
->read_function
=_my_b_async_read
;
299 info
->inited
=info
->aio_result
.pending
=0;
302 } /* init_io_cache */
304 /* Wait until current request is ready */
307 static void my_aiowait(my_aio_result
*result
)
311 struct aio_result_t
*tmp
;
314 if ((int) (tmp
=aiowait((struct timeval
*) 0)) == -1)
318 DBUG_PRINT("error",("No aio request, error: %d",errno
));
319 result
->pending
=0; /* Assume everythings is ok */
322 ((my_aio_result
*) tmp
)->pending
=0;
323 if ((my_aio_result
*) tmp
== result
)
333 Use this to reset cache to re-start reading or to change the type
334 between READ_CACHE <-> WRITE_CACHE
335 If we are doing a reinit of a cache where we have the start of the file
336 in the cache, we are reusing this memory without flushing it to disk.
339 my_bool
reinit_io_cache(IO_CACHE
*info
, enum cache_type type
,
340 my_off_t seek_offset
,
341 pbool use_async_io
__attribute__((unused
)),
344 DBUG_ENTER("reinit_io_cache");
345 DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
346 (ulong
) info
, type
, (ulong
) seek_offset
,
349 /* One can't do reinit with the following types */
350 DBUG_ASSERT(type
!= READ_NET
&& info
->type
!= READ_NET
&&
351 type
!= WRITE_NET
&& info
->type
!= WRITE_NET
&&
352 type
!= SEQ_READ_APPEND
&& info
->type
!= SEQ_READ_APPEND
);
354 /* If the whole file is in memory, avoid flushing to disk */
356 seek_offset
>= info
->pos_in_file
&&
357 seek_offset
<= my_b_tell(info
))
359 /* Reuse current buffer without flushing it to disk */
361 if (info
->type
== WRITE_CACHE
&& type
== READ_CACHE
)
363 info
->read_end
=info
->write_pos
;
364 info
->end_of_file
=my_b_tell(info
);
366 Trigger a new seek only if we have a valid
369 info
->seek_not_done
= (info
->file
!= -1);
371 else if (type
== WRITE_CACHE
)
373 if (info
->type
== READ_CACHE
)
375 info
->write_end
=info
->write_buffer
+info
->buffer_length
;
376 info
->seek_not_done
=1;
378 info
->end_of_file
= ~(my_off_t
) 0;
380 pos
=info
->request_pos
+(seek_offset
-info
->pos_in_file
);
381 if (type
== WRITE_CACHE
)
386 my_aiowait(&info
->aio_result
); /* Wait for outstanding req */
392 If we change from WRITE_CACHE to READ_CACHE, assume that everything
393 after the current positions should be ignored
395 if (info
->type
== WRITE_CACHE
&& type
== READ_CACHE
)
396 info
->end_of_file
=my_b_tell(info
);
397 /* flush cache if we want to reuse it */
398 if (!clear_cache
&& my_b_flush_io_cache(info
,1))
400 info
->pos_in_file
=seek_offset
;
401 /* Better to do always do a seek */
402 info
->seek_not_done
=1;
403 info
->request_pos
=info
->read_pos
=info
->write_pos
=info
->buffer
;
404 if (type
== READ_CACHE
)
406 info
->read_end
=info
->buffer
; /* Nothing in cache */
410 info
->write_end
=(info
->buffer
+ info
->buffer_length
-
411 (seek_offset
& (IO_SIZE
-1)));
412 info
->end_of_file
= ~(my_off_t
) 0;
417 init_functions(info
);
420 if (use_async_io
&& ! my_disable_async_io
&&
421 ((ulong
) info
->buffer_length
<
422 (ulong
) (info
->end_of_file
- seek_offset
)))
424 info
->read_length
=info
->buffer_length
/2;
425 info
->read_function
=_my_b_async_read
;
430 } /* reinit_io_cache */
439 info IO_CACHE pointer
440 Buffer Buffer to retrieve count bytes from file
441 Count Number of bytes to read into Buffer
444 This function is only called from the my_b_read() macro when there
445 isn't enough characters in the buffer to satisfy the request.
449 When changing this function, be careful with handling file offsets
450 (end-of_file, pos_in_file). Do not cast them to possibly smaller
451 types than my_off_t unless you can be sure that their value fits.
452 Same applies to differences of file offsets.
454 When changing this function, check _my_b_read_r(). It might need the
458 0 we succeeded in reading all data
459 1 Error: can't read requested characters
462 int _my_b_read(register IO_CACHE
*info
, uchar
*Buffer
, size_t Count
)
464 size_t length
,diff_length
,left_length
, max_length
;
465 my_off_t pos_in_file
;
466 DBUG_ENTER("_my_b_read");
468 if ((left_length
= (size_t) (info
->read_end
-info
->read_pos
)))
470 DBUG_ASSERT(Count
>= left_length
); /* User is not using my_b_read() */
471 memcpy(Buffer
,info
->read_pos
, left_length
);
476 /* pos_in_file always point on where info->buffer was read */
477 pos_in_file
=info
->pos_in_file
+ (size_t) (info
->read_end
- info
->buffer
);
480 Whenever a function which operates on IO_CACHE flushes/writes
481 some part of the IO_CACHE to disk it will set the property
482 "seek_not_done" to indicate this to other functions operating
485 if (info
->seek_not_done
)
487 if ((my_seek(info
->file
,pos_in_file
,MY_SEEK_SET
,MYF(0))
488 != MY_FILEPOS_ERROR
))
490 /* No error, reset seek_not_done flag. */
491 info
->seek_not_done
= 0;
496 If the seek failed and the error number is ESPIPE, it is because
497 info->file is a pipe or socket or FIFO. We never should have tried
498 to seek on that. See Bugs#25807 and #22828 for more info.
500 DBUG_ASSERT(my_errno
!= ESPIPE
);
506 diff_length
= (size_t) (pos_in_file
& (IO_SIZE
-1));
507 if (Count
>= (size_t) (IO_SIZE
+(IO_SIZE
-diff_length
)))
508 { /* Fill first intern buffer */
510 if (info
->end_of_file
<= pos_in_file
)
512 info
->error
= (int) left_length
;
515 length
=(Count
& (size_t) ~(IO_SIZE
-1))-diff_length
;
516 if ((read_length
= my_read(info
->file
,Buffer
, length
, info
->myflags
))
519 info
->error
= (read_length
== (size_t) -1 ? -1 :
520 (int) (read_length
+left_length
));
530 max_length
= info
->read_length
-diff_length
;
531 if (info
->type
!= READ_FIFO
&&
532 max_length
> (info
->end_of_file
- pos_in_file
))
533 max_length
= (size_t) (info
->end_of_file
- pos_in_file
);
538 info
->error
= left_length
; /* We only got this many char */
541 length
=0; /* Didn't read any chars */
543 else if ((length
= my_read(info
->file
,info
->buffer
, max_length
,
544 info
->myflags
)) < Count
||
545 length
== (size_t) -1)
547 if (length
!= (size_t) -1)
548 memcpy(Buffer
, info
->buffer
, length
);
549 info
->pos_in_file
= pos_in_file
;
550 info
->error
= length
== (size_t) -1 ? -1 : (int) (length
+left_length
);
551 info
->read_pos
=info
->read_end
=info
->buffer
;
554 info
->read_pos
=info
->buffer
+Count
;
555 info
->read_end
=info
->buffer
+length
;
556 info
->pos_in_file
=pos_in_file
;
557 memcpy(Buffer
, info
->buffer
, Count
);
564 Prepare IO_CACHE for shared use.
567 init_io_cache_share()
568 read_cache A read cache. This will be copied for
569 every thread after setup.
571 write_cache If non-NULL a write cache that is to be
572 synchronized with the read caches.
573 num_threads Number of threads sharing the cache
574 including the write thread if any.
578 The shared cache is used so: One IO_CACHE is initialized with
579 init_io_cache(). This includes the allocation of a buffer. Then a
580 share is allocated and init_io_cache_share() is called with the io
581 cache and the share. Then the io cache is copied for each thread. So
582 every thread has its own copy of IO_CACHE. But the allocated buffer
583 is shared because cache->buffer is the same for all caches.
585 One thread reads data from the file into the buffer. All threads
586 read from the buffer, but every thread maintains its own set of
587 pointers into the buffer. When all threads have used up the buffer
588 contents, one of the threads reads the next block of data into the
589 buffer. To accomplish this, each thread enters the cache lock before
590 accessing the buffer. They wait in lock_io_cache() until all threads
591 joined the lock. The last thread entering the lock is in charge of
592 reading from file to buffer. It wakes all threads when done.
594 Synchronizing a write cache to the read caches works so: Whenever
595 the write buffer needs a flush, the write thread enters the lock and
596 waits for all other threads to enter the lock too. They do this when
597 they have used up the read buffer. When all threads are in the lock,
598 the write thread copies the write buffer to the read buffer and
601 share->running_threads is the number of threads not being in the
602 cache lock. When entering lock_io_cache() the number is decreased.
603 When the thread that fills the buffer enters unlock_io_cache() the
604 number is reset to the number of threads. The condition
605 running_threads == 0 means that all threads are in the lock. Bumping
606 up the number to the full count is non-intuitive. But increasing the
607 number by one for each thread that leaves the lock could lead to a
608 solo run of one thread. The last thread to join a lock reads from
609 file to buffer, wakes the other threads, processes the data in the
610 cache and enters the lock again. If no other thread left the lock
611 meanwhile, it would think it's the last one again and read the next
614 The share has copies of 'error', 'buffer', 'read_end', and
615 'pos_in_file' from the thread that filled the buffer. We may not be
616 able to access this information directly from its cache because the
617 thread may be removed from the share before the variables could be
618 copied by all other threads. Or, if a write buffer is synchronized,
619 it would change its 'pos_in_file' after waking the other threads,
620 possibly before they could copy its value.
622 However, the 'buffer' variable in the share is for a synchronized
623 write cache. It needs to know where to put the data. Otherwise it
624 would need access to the read cache of one of the threads that is
625 not yet removed from the share.
631 void init_io_cache_share(IO_CACHE
*read_cache
, IO_CACHE_SHARE
*cshare
,
632 IO_CACHE
*write_cache
, uint num_threads
)
634 DBUG_ENTER("init_io_cache_share");
635 DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
636 "write_cache: 0x%lx threads: %u",
637 (long) read_cache
, (long) cshare
,
638 (long) write_cache
, num_threads
));
640 DBUG_ASSERT(num_threads
> 1);
641 DBUG_ASSERT(read_cache
->type
== READ_CACHE
);
642 DBUG_ASSERT(!write_cache
|| (write_cache
->type
== WRITE_CACHE
));
644 pthread_mutex_init(&cshare
->mutex
, MY_MUTEX_INIT_FAST
);
645 pthread_cond_init(&cshare
->cond
, 0);
646 pthread_cond_init(&cshare
->cond_writer
, 0);
648 cshare
->running_threads
= num_threads
;
649 cshare
->total_threads
= num_threads
;
650 cshare
->error
= 0; /* Initialize. */
651 cshare
->buffer
= read_cache
->buffer
;
652 cshare
->read_end
= NULL
; /* See function comment of lock_io_cache(). */
653 cshare
->pos_in_file
= 0; /* See function comment of lock_io_cache(). */
654 cshare
->source_cache
= write_cache
; /* Can be NULL. */
656 read_cache
->share
= cshare
;
657 read_cache
->read_function
= _my_b_read_r
;
658 read_cache
->current_pos
= NULL
;
659 read_cache
->current_end
= NULL
;
662 write_cache
->share
= cshare
;
669 Remove a thread from shared access to IO_CACHE.
673 cache The IO_CACHE to be removed from the share.
677 Every thread must do that on exit for not to deadlock other threads.
679 The last thread destroys the pthread resources.
681 A writer flushes its cache first.
687 void remove_io_thread(IO_CACHE
*cache
)
689 IO_CACHE_SHARE
*cshare
= cache
->share
;
691 DBUG_ENTER("remove_io_thread");
693 /* If the writer goes, it needs to flush the write cache. */
694 if (cache
== cshare
->source_cache
)
695 flush_io_cache(cache
);
697 pthread_mutex_lock(&cshare
->mutex
);
698 DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
699 (cache
== cshare
->source_cache
) ?
700 "writer" : "reader", (long) cache
));
702 /* Remove from share. */
703 total
= --cshare
->total_threads
;
704 DBUG_PRINT("io_cache_share", ("remaining threads: %u", total
));
706 /* Detach from share. */
709 /* If the writer goes, let the readers know. */
710 if (cache
== cshare
->source_cache
)
712 DBUG_PRINT("io_cache_share", ("writer leaves"));
713 cshare
->source_cache
= NULL
;
716 /* If all threads are waiting for me to join the lock, wake them. */
717 if (!--cshare
->running_threads
)
719 DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
720 pthread_cond_signal(&cshare
->cond_writer
);
721 pthread_cond_broadcast(&cshare
->cond
);
724 pthread_mutex_unlock(&cshare
->mutex
);
728 DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
729 pthread_cond_destroy (&cshare
->cond_writer
);
730 pthread_cond_destroy (&cshare
->cond
);
731 pthread_mutex_destroy(&cshare
->mutex
);
739 Lock IO cache and wait for all other threads to join.
743 cache The cache of the thread entering the lock.
744 pos File position of the block to read.
745 Unused for the write thread.
749 Wait for all threads to finish with the current buffer. We want
750 all threads to proceed in concert. The last thread to join
751 lock_io_cache() will read the block from file and all threads start
752 to use it. Then they will join again for reading the next block.
754 The waiting threads detect a fresh buffer by comparing
755 cshare->pos_in_file with the position they want to process next.
756 Since the first block may start at position 0, we take
757 cshare->read_end as an additional condition. This variable is
758 initialized to NULL and will be set after a block of data is written
762 1 OK, lock in place, go ahead and read.
763 0 OK, unlocked, another thread did the read.
766 static int lock_io_cache(IO_CACHE
*cache
, my_off_t pos
)
768 IO_CACHE_SHARE
*cshare
= cache
->share
;
769 DBUG_ENTER("lock_io_cache");
771 /* Enter the lock. */
772 pthread_mutex_lock(&cshare
->mutex
);
773 cshare
->running_threads
--;
774 DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
775 (cache
== cshare
->source_cache
) ?
776 "writer" : "reader", (long) cache
, (ulong
) pos
,
777 cshare
->running_threads
));
779 if (cshare
->source_cache
)
781 /* A write cache is synchronized to the read caches. */
783 if (cache
== cshare
->source_cache
)
785 /* The writer waits until all readers are here. */
786 while (cshare
->running_threads
)
788 DBUG_PRINT("io_cache_share", ("writer waits in lock"));
789 pthread_cond_wait(&cshare
->cond_writer
, &cshare
->mutex
);
791 DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
793 /* Stay locked. Leave the lock later by unlock_io_cache(). */
797 /* The last thread wakes the writer. */
798 if (!cshare
->running_threads
)
800 DBUG_PRINT("io_cache_share", ("waking writer"));
801 pthread_cond_signal(&cshare
->cond_writer
);
805 Readers wait until the data is copied from the writer. Another
806 reason to stop waiting is the removal of the write thread. If this
807 happens, we leave the lock with old data in the buffer.
809 while ((!cshare
->read_end
|| (cshare
->pos_in_file
< pos
)) &&
810 cshare
->source_cache
)
812 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
813 pthread_cond_wait(&cshare
->cond
, &cshare
->mutex
);
817 If the writer was removed from the share while this thread was
818 asleep, we need to simulate an EOF condition. The writer cannot
819 reset the share variables as they might still be in use by readers
820 of the last block. When we awake here then because the last
821 joining thread signalled us. If the writer is not the last, it
822 will not signal. So it is safe to clear the buffer here.
824 if (!cshare
->read_end
|| (cshare
->pos_in_file
< pos
))
826 DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
827 cshare
->read_end
= cshare
->buffer
; /* Empty buffer. */
828 cshare
->error
= 0; /* EOF is not an error. */
834 There are read caches only. The last thread arriving in
835 lock_io_cache() continues with a locked cache and reads the block.
837 if (!cshare
->running_threads
)
839 DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
840 /* Stay locked. Leave the lock later by unlock_io_cache(). */
845 All other threads wait until the requested block is read by the
846 last thread arriving. Another reason to stop waiting is the
847 removal of a thread. If this leads to all threads being in the
848 lock, we have to continue also. The first of the awaken threads
849 will then do the read.
851 while ((!cshare
->read_end
|| (cshare
->pos_in_file
< pos
)) &&
852 cshare
->running_threads
)
854 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
855 pthread_cond_wait(&cshare
->cond
, &cshare
->mutex
);
858 /* If the block is not yet read, continue with a locked cache and read. */
859 if (!cshare
->read_end
|| (cshare
->pos_in_file
< pos
))
861 DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
862 /* Stay locked. Leave the lock later by unlock_io_cache(). */
866 /* Another thread did read the block already. */
868 DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
869 (uint
) (cshare
->read_end
? (size_t)
870 (cshare
->read_end
- cshare
->buffer
) :
874 Leave the lock. Do not call unlock_io_cache() later. The thread that
875 filled the buffer did this and marked all threads as running.
877 pthread_mutex_unlock(&cshare
->mutex
);
887 cache The cache of the thread leaving the lock.
890 This is called by the thread that filled the buffer. It marks all
891 threads as running and awakes them. This must not be done by any
894 Do not signal cond_writer. Either there is no writer or the writer
895 is the only one who can call this function.
897 The reason for resetting running_threads to total_threads before
898 waking all other threads is that it could be possible that this
899 thread is so fast with processing the buffer that it enters the lock
900 before even one other thread has left it. If every awoken thread
901 would increase running_threads by one, this thread could think that
902 he is again the last to join and would not wait for the other
903 threads to process the data.
909 static void unlock_io_cache(IO_CACHE
*cache
)
911 IO_CACHE_SHARE
*cshare
= cache
->share
;
912 DBUG_ENTER("unlock_io_cache");
913 DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
914 (cache
== cshare
->source_cache
) ?
916 (long) cache
, (ulong
) cshare
->pos_in_file
,
917 cshare
->total_threads
));
919 cshare
->running_threads
= cshare
->total_threads
;
920 pthread_cond_broadcast(&cshare
->cond
);
921 pthread_mutex_unlock(&cshare
->mutex
);
927 Read from IO_CACHE when it is shared between several threads.
931 cache IO_CACHE pointer
932 Buffer Buffer to retrieve count bytes from file
933 Count Number of bytes to read into Buffer
936 This function is only called from the my_b_read() macro when there
937 isn't enough characters in the buffer to satisfy the request.
941 It works as follows: when a thread tries to read from a file (that
942 is, after using all the data from the (shared) buffer), it just
943 hangs on lock_io_cache(), waiting for other threads. When the very
944 last thread attempts a read, lock_io_cache() returns 1, the thread
945 does actual IO and unlock_io_cache(), which signals all the waiting
946 threads that data is in the buffer.
950 When changing this function, be careful with handling file offsets
951 (end-of_file, pos_in_file). Do not cast them to possibly smaller
952 types than my_off_t unless you can be sure that their value fits.
953 Same applies to differences of file offsets. (Bug #11527)
955 When changing this function, check _my_b_read(). It might need the
959 0 we succeeded in reading all data
960 1 Error: can't read requested characters
963 int _my_b_read_r(register IO_CACHE
*cache
, uchar
*Buffer
, size_t Count
)
965 my_off_t pos_in_file
;
966 size_t length
, diff_length
, left_length
;
967 IO_CACHE_SHARE
*cshare
= cache
->share
;
968 DBUG_ENTER("_my_b_read_r");
970 if ((left_length
= (size_t) (cache
->read_end
- cache
->read_pos
)))
972 DBUG_ASSERT(Count
>= left_length
); /* User is not using my_b_read() */
973 memcpy(Buffer
, cache
->read_pos
, left_length
);
974 Buffer
+= left_length
;
981 pos_in_file
= cache
->pos_in_file
+ (cache
->read_end
- cache
->buffer
);
982 diff_length
= (size_t) (pos_in_file
& (IO_SIZE
-1));
983 length
=IO_ROUND_UP(Count
+diff_length
)-diff_length
;
984 length
= ((length
<= cache
->read_length
) ?
985 length
+ IO_ROUND_DN(cache
->read_length
- length
) :
986 length
- IO_ROUND_UP(length
- cache
->read_length
));
987 if (cache
->type
!= READ_FIFO
&&
988 (length
> (cache
->end_of_file
- pos_in_file
)))
989 length
= (size_t) (cache
->end_of_file
- pos_in_file
);
992 cache
->error
= (int) left_length
;
995 if (lock_io_cache(cache
, pos_in_file
))
997 /* With a synchronized write/read cache we won't come here... */
998 DBUG_ASSERT(!cshare
->source_cache
);
1000 ... unless the writer has gone before this thread entered the
1001 lock. Simulate EOF in this case. It can be distinguished by
1004 if (cache
->file
< 0)
1009 Whenever a function which operates on IO_CACHE flushes/writes
1010 some part of the IO_CACHE to disk it will set the property
1011 "seek_not_done" to indicate this to other functions operating
1014 if (cache
->seek_not_done
)
1016 if (my_seek(cache
->file
, pos_in_file
, MY_SEEK_SET
, MYF(0))
1017 == MY_FILEPOS_ERROR
)
1020 unlock_io_cache(cache
);
1024 len
= my_read(cache
->file
, cache
->buffer
, length
, cache
->myflags
);
1026 DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong
) len
));
1028 cache
->read_end
= cache
->buffer
+ (len
== (size_t) -1 ? 0 : len
);
1029 cache
->error
= (len
== length
? 0 : (int) len
);
1030 cache
->pos_in_file
= pos_in_file
;
1032 /* Copy important values to the share. */
1033 cshare
->error
= cache
->error
;
1034 cshare
->read_end
= cache
->read_end
;
1035 cshare
->pos_in_file
= pos_in_file
;
1037 /* Mark all threads as running and wake them. */
1038 unlock_io_cache(cache
);
1043 With a synchronized write/read cache readers always come here.
1044 Copy important values from the share.
1046 cache
->error
= cshare
->error
;
1047 cache
->read_end
= cshare
->read_end
;
1048 cache
->pos_in_file
= cshare
->pos_in_file
;
1050 len
= ((cache
->error
== -1) ? (size_t) -1 :
1051 (size_t) (cache
->read_end
- cache
->buffer
));
1053 cache
->read_pos
= cache
->buffer
;
1054 cache
->seek_not_done
= 0;
1055 if (len
== 0 || len
== (size_t) -1)
1057 DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1058 (ulong
) len
, (ulong
) left_length
));
1059 cache
->error
= (int) left_length
;
1062 cnt
= (len
> Count
) ? Count
: len
;
1063 memcpy(Buffer
, cache
->read_pos
, cnt
);
1067 cache
->read_pos
+= cnt
;
1074 Copy data from write cache to read cache.
1077 copy_to_read_buffer()
1078 write_cache The write cache.
1079 write_buffer The source of data, mostly the cache buffer.
1080 write_length The number of bytes to copy.
1083 The write thread will wait for all read threads to join the cache
1084 lock. Then it copies the data over and wakes the read threads.
1090 static void copy_to_read_buffer(IO_CACHE
*write_cache
,
1091 const uchar
*write_buffer
, size_t write_length
)
1093 IO_CACHE_SHARE
*cshare
= write_cache
->share
;
1095 DBUG_ASSERT(cshare
->source_cache
== write_cache
);
1097 write_length is usually less or equal to buffer_length.
1098 It can be bigger if _my_b_write() is called with a big length.
1100 while (write_length
)
1102 size_t copy_length
= min(write_length
, write_cache
->buffer_length
);
1103 int __attribute__((unused
)) rc
;
1105 rc
= lock_io_cache(write_cache
, write_cache
->pos_in_file
);
1106 /* The writing thread does always have the lock when it awakes. */
1109 memcpy(cshare
->buffer
, write_buffer
, copy_length
);
1112 cshare
->read_end
= cshare
->buffer
+ copy_length
;
1113 cshare
->pos_in_file
= write_cache
->pos_in_file
;
1115 /* Mark all threads as running and wake them. */
1116 unlock_io_cache(write_cache
);
1118 write_buffer
+= copy_length
;
1119 write_length
-= copy_length
;
1126 Do sequential read from the SEQ_READ_APPEND cache.
1128 We do this in three stages:
1129 - first read from info->buffer
1130 - then if there are still data to read, try the file descriptor
1131 - afterwards, if there are still data to read, try append buffer
1138 int _my_b_seq_read(register IO_CACHE
*info
, uchar
*Buffer
, size_t Count
)
1140 size_t length
, diff_length
, left_length
, save_count
, max_length
;
1141 my_off_t pos_in_file
;
1144 /* first, read the regular buffer */
1145 if ((left_length
=(size_t) (info
->read_end
-info
->read_pos
)))
1147 DBUG_ASSERT(Count
> left_length
); /* User is not using my_b_read() */
1148 memcpy(Buffer
,info
->read_pos
, left_length
);
1149 Buffer
+=left_length
;
1152 lock_append_buffer(info
);
1154 /* pos_in_file always point on where info->buffer was read */
1155 if ((pos_in_file
=info
->pos_in_file
+
1156 (size_t) (info
->read_end
- info
->buffer
)) >= info
->end_of_file
)
1157 goto read_append_buffer
;
1160 With read-append cache we must always do a seek before we read,
1161 because the write could have moved the file pointer astray
1163 if (my_seek(info
->file
,pos_in_file
,MY_SEEK_SET
,MYF(0)) == MY_FILEPOS_ERROR
)
1166 unlock_append_buffer(info
);
1169 info
->seek_not_done
=0;
1171 diff_length
= (size_t) (pos_in_file
& (IO_SIZE
-1));
1173 /* now the second stage begins - read from file descriptor */
1174 if (Count
>= (size_t) (IO_SIZE
+(IO_SIZE
-diff_length
)))
1176 /* Fill first intern buffer */
1179 length
=(Count
& (size_t) ~(IO_SIZE
-1))-diff_length
;
1180 if ((read_length
= my_read(info
->file
,Buffer
, length
,
1181 info
->myflags
)) == (size_t) -1)
1184 unlock_append_buffer(info
);
1188 Buffer
+=read_length
;
1189 pos_in_file
+=read_length
;
1191 if (read_length
!= length
)
1194 We only got part of data; Read the rest of the data from the
1197 goto read_append_buffer
;
1199 left_length
+=length
;
1203 max_length
= info
->read_length
-diff_length
;
1204 if (max_length
> (info
->end_of_file
- pos_in_file
))
1205 max_length
= (size_t) (info
->end_of_file
- pos_in_file
);
1209 goto read_append_buffer
;
1210 length
=0; /* Didn't read any more chars */
1214 length
= my_read(info
->file
,info
->buffer
, max_length
, info
->myflags
);
1215 if (length
== (size_t) -1)
1218 unlock_append_buffer(info
);
1223 memcpy(Buffer
, info
->buffer
, length
);
1228 added the line below to make
1229 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1230 otherwise this does not appear to be needed
1232 pos_in_file
+= length
;
1233 goto read_append_buffer
;
1236 unlock_append_buffer(info
);
1237 info
->read_pos
=info
->buffer
+Count
;
1238 info
->read_end
=info
->buffer
+length
;
1239 info
->pos_in_file
=pos_in_file
;
1240 memcpy(Buffer
,info
->buffer
,(size_t) Count
);
1246 Read data from the current write buffer.
1247 Count should never be == 0 here (The code will work even if count is 0)
1251 /* First copy the data to Count */
1252 size_t len_in_buff
= (size_t) (info
->write_pos
- info
->append_read_pos
);
1254 size_t transfer_len
;
1256 DBUG_ASSERT(info
->append_read_pos
<= info
->write_pos
);
1258 TODO: figure out if the assert below is needed or correct.
1260 DBUG_ASSERT(pos_in_file
== info
->end_of_file
);
1261 copy_len
=min(Count
, len_in_buff
);
1262 memcpy(Buffer
, info
->append_read_pos
, copy_len
);
1263 info
->append_read_pos
+= copy_len
;
1266 info
->error
= save_count
- Count
;
1268 /* Fill read buffer with data from write buffer */
1269 memcpy(info
->buffer
, info
->append_read_pos
,
1270 (size_t) (transfer_len
=len_in_buff
- copy_len
));
1271 info
->read_pos
= info
->buffer
;
1272 info
->read_end
= info
->buffer
+transfer_len
;
1273 info
->append_read_pos
=info
->write_pos
;
1274 info
->pos_in_file
=pos_in_file
+copy_len
;
1275 info
->end_of_file
+=len_in_buff
;
1277 unlock_append_buffer(info
);
1278 return Count
? 1 : 0;
1285 Read from the IO_CACHE into a buffer and feed asynchronously
1286 from disk when needed.
1290 info IO_CACHE pointer
1291 Buffer Buffer to retrieve count bytes from file
1292 Count Number of bytes to read into Buffer
1295 -1 An error has occurred; my_errno is set.
1297 1 An error has occurred; IO_CACHE to error state.
1300 int _my_b_async_read(register IO_CACHE
*info
, uchar
*Buffer
, size_t Count
)
1302 size_t length
,read_length
,diff_length
,left_length
,use_length
,org_Count
;
1304 my_off_t next_pos_in_file
;
1307 memcpy(Buffer
,info
->read_pos
,
1308 (left_length
= (size_t) (info
->read_end
-info
->read_pos
)));
1309 Buffer
+=left_length
;
1314 { /* wait for read block */
1315 info
->inited
=0; /* No more block to read */
1316 my_aiowait(&info
->aio_result
); /* Wait for outstanding req */
1317 if (info
->aio_result
.result
.aio_errno
)
1319 if (info
->myflags
& MY_WME
)
1320 my_error(EE_READ
, MYF(ME_BELL
+ME_WAITTANG
),
1321 my_filename(info
->file
),
1322 info
->aio_result
.result
.aio_errno
);
1323 my_errno
=info
->aio_result
.result
.aio_errno
;
1327 if (! (read_length
= (size_t) info
->aio_result
.result
.aio_return
) ||
1328 read_length
== (size_t) -1)
1330 my_errno
=0; /* For testing */
1331 info
->error
= (read_length
== (size_t) -1 ? -1 :
1332 (int) (read_length
+left_length
));
1335 info
->pos_in_file
+= (size_t) (info
->read_end
- info
->request_pos
);
1337 if (info
->request_pos
!= info
->buffer
)
1338 info
->request_pos
=info
->buffer
;
1340 info
->request_pos
=info
->buffer
+info
->read_length
;
1341 info
->read_pos
=info
->request_pos
;
1342 next_pos_in_file
=info
->aio_read_pos
+read_length
;
1344 /* Check if pos_in_file is changed
1345 (_ni_read_cache may have skipped some bytes) */
1347 if (info
->aio_read_pos
< info
->pos_in_file
)
1348 { /* Fix if skipped bytes */
1349 if (info
->aio_read_pos
+ read_length
< info
->pos_in_file
)
1351 read_length
=0; /* Skip block */
1352 next_pos_in_file
=info
->pos_in_file
;
1356 my_off_t offset
= (info
->pos_in_file
- info
->aio_read_pos
);
1357 info
->pos_in_file
=info
->aio_read_pos
; /* Whe are here */
1358 info
->read_pos
=info
->request_pos
+offset
;
1359 read_length
-=offset
; /* Bytes left from read_pos */
1363 if (info
->aio_read_pos
> info
->pos_in_file
)
1366 return(info
->read_length
= (size_t) -1);
1369 /* Copy found bytes to buffer */
1370 length
=min(Count
,read_length
);
1371 memcpy(Buffer
,info
->read_pos
,(size_t) length
);
1374 left_length
+=length
;
1375 info
->read_end
=info
->rc_pos
+read_length
;
1376 info
->read_pos
+=length
;
1379 next_pos_in_file
=(info
->pos_in_file
+ (size_t)
1380 (info
->read_end
- info
->request_pos
));
1382 /* If reading large blocks, or first read or read with skip */
1385 if (next_pos_in_file
== info
->end_of_file
)
1387 info
->error
=(int) (read_length
+left_length
);
1391 if (my_seek(info
->file
,next_pos_in_file
,MY_SEEK_SET
,MYF(0))
1392 == MY_FILEPOS_ERROR
)
1398 read_length
=IO_SIZE
*2- (size_t) (next_pos_in_file
& (IO_SIZE
-1));
1399 if (Count
< read_length
)
1400 { /* Small block, read to cache */
1401 if ((read_length
=my_read(info
->file
,info
->request_pos
,
1402 read_length
, info
->myflags
)) == (size_t) -1)
1403 return info
->error
= -1;
1404 use_length
=min(Count
,read_length
);
1405 memcpy(Buffer
,info
->request_pos
,(size_t) use_length
);
1406 info
->read_pos
=info
->request_pos
+Count
;
1407 info
->read_end
=info
->request_pos
+read_length
;
1408 info
->pos_in_file
=next_pos_in_file
; /* Start of block in cache */
1409 next_pos_in_file
+=read_length
;
1411 if (Count
!= use_length
)
1412 { /* Didn't find hole block */
1413 if (info
->myflags
& (MY_WME
| MY_FAE
| MY_FNABP
) && Count
!= org_Count
)
1414 my_error(EE_EOFERR
, MYF(ME_BELL
+ME_WAITTANG
),
1415 my_filename(info
->file
),my_errno
);
1416 info
->error
=(int) (read_length
+left_length
);
1421 { /* Big block, don't cache it */
1422 if ((read_length
= my_read(info
->file
,Buffer
, Count
,info
->myflags
))
1425 info
->error
= read_length
== (size_t) -1 ? -1 : read_length
+left_length
;
1428 info
->read_pos
=info
->read_end
=info
->request_pos
;
1429 info
->pos_in_file
=(next_pos_in_file
+=Count
);
1433 /* Read next block with asyncronic io */
1434 diff_length
=(next_pos_in_file
& (IO_SIZE
-1));
1435 max_length
= info
->read_length
- diff_length
;
1436 if (max_length
> info
->end_of_file
- next_pos_in_file
)
1437 max_length
= (size_t) (info
->end_of_file
- next_pos_in_file
);
1439 if (info
->request_pos
!= info
->buffer
)
1440 read_buffer
=info
->buffer
;
1442 read_buffer
=info
->buffer
+info
->read_length
;
1443 info
->aio_read_pos
=next_pos_in_file
;
1446 info
->aio_result
.result
.aio_errno
=AIO_INPROGRESS
; /* Marker for test */
1447 DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1448 (ulong
) next_pos_in_file
, (ulong
) max_length
));
1449 if (aioread(info
->file
,read_buffer
, max_length
,
1450 (my_off_t
) next_pos_in_file
,MY_SEEK_SET
,
1451 &info
->aio_result
.result
))
1452 { /* Skip async io */
1454 DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1455 errno
, info
->aio_result
.result
.aio_errno
));
1456 if (info
->request_pos
!= info
->buffer
)
1458 bmove(info
->buffer
,info
->request_pos
,
1459 (size_t) (info
->read_end
- info
->read_pos
));
1460 info
->request_pos
=info
->buffer
;
1461 info
->read_pos
-=info
->read_length
;
1462 info
->read_end
-=info
->read_length
;
1464 info
->read_length
=info
->buffer_length
; /* Use hole buffer */
1465 info
->read_function
=_my_b_read
; /* Use normal IO_READ next */
1468 info
->inited
=info
->aio_result
.pending
=1;
1470 return 0; /* Block read, async in use */
1471 } /* _my_b_async_read */
1475 /* Read one byte when buffer is empty */
1477 int _my_b_get(IO_CACHE
*info
)
1480 IO_CACHE_CALLBACK pre_read
,post_read
;
1481 if ((pre_read
= info
->pre_read
))
1483 if ((*(info
)->read_function
)(info
,&buff
,1))
1485 if ((post_read
= info
->post_read
))
1487 return (int) (uchar
) buff
;
1491 Write a byte buffer to IO_CACHE and flush to disk
1492 if IO_CACHE is full.
1497 -1 On error; my_errno contains error code.
1500 int _my_b_write(register IO_CACHE
*info
, const uchar
*Buffer
, size_t Count
)
1502 size_t rest_length
,length
;
1504 if (info
->pos_in_file
+info
->buffer_length
> info
->end_of_file
)
1506 my_errno
=errno
=EFBIG
;
1507 return info
->error
= -1;
1510 rest_length
= (size_t) (info
->write_end
- info
->write_pos
);
1511 memcpy(info
->write_pos
,Buffer
,(size_t) rest_length
);
1512 Buffer
+=rest_length
;
1514 info
->write_pos
+=rest_length
;
1516 if (my_b_flush_io_cache(info
,1))
1518 if (Count
>= IO_SIZE
)
1519 { /* Fill first intern buffer */
1520 length
=Count
& (size_t) ~(IO_SIZE
-1);
1521 if (info
->seek_not_done
)
1524 Whenever a function which operates on IO_CACHE flushes/writes
1525 some part of the IO_CACHE to disk it will set the property
1526 "seek_not_done" to indicate this to other functions operating
1529 if (my_seek(info
->file
,info
->pos_in_file
,MY_SEEK_SET
,MYF(0)))
1534 info
->seek_not_done
=0;
1536 if (my_write(info
->file
, Buffer
, length
, info
->myflags
| MY_NABP
))
1537 return info
->error
= -1;
1541 In case of a shared I/O cache with a writer we normally do direct
1542 write cache to read cache copy. Simulate this here by direct
1543 caller buffer to read cache copy. Do it after the write so that
1544 the cache readers actions on the flushed part can go in parallel
1545 with the write of the extra stuff. copy_to_read_buffer()
1546 synchronizes writer and readers so that after this call the
1547 readers can act on the extra stuff while the writer can go ahead
1548 and prepare the next output. copy_to_read_buffer() relies on
1552 copy_to_read_buffer(info
, Buffer
, length
);
1557 info
->pos_in_file
+=length
;
1559 memcpy(info
->write_pos
,Buffer
,(size_t) Count
);
1560 info
->write_pos
+=Count
;
1566 Append a block to the write buffer.
1567 This is done with the buffer locked to ensure that we don't read from
1568 the write buffer before we are ready with it.
1571 int my_b_append(register IO_CACHE
*info
, const uchar
*Buffer
, size_t Count
)
1573 size_t rest_length
,length
;
1577 Assert that we cannot come here with a shared cache. If we do one
1578 day, we might need to add a call to copy_to_read_buffer().
1580 DBUG_ASSERT(!info
->share
);
1583 lock_append_buffer(info
);
1584 rest_length
= (size_t) (info
->write_end
- info
->write_pos
);
1585 if (Count
<= rest_length
)
1587 memcpy(info
->write_pos
, Buffer
, rest_length
);
1588 Buffer
+=rest_length
;
1590 info
->write_pos
+=rest_length
;
1591 if (my_b_flush_io_cache(info
,0))
1593 unlock_append_buffer(info
);
1596 if (Count
>= IO_SIZE
)
1597 { /* Fill first intern buffer */
1598 length
=Count
& (size_t) ~(IO_SIZE
-1);
1599 if (my_write(info
->file
,Buffer
, length
, info
->myflags
| MY_NABP
))
1601 unlock_append_buffer(info
);
1602 return info
->error
= -1;
1606 info
->end_of_file
+=length
;
1610 memcpy(info
->write_pos
,Buffer
,(size_t) Count
);
1611 info
->write_pos
+=Count
;
1612 unlock_append_buffer(info
);
1617 int my_b_safe_write(IO_CACHE
*info
, const uchar
*Buffer
, size_t Count
)
1620 Sasha: We are not writing this with the ? operator to avoid hitting
1621 a possible compiler bug. At least gcc 2.95 cannot deal with
1622 several layers of ternary operators that evaluated comma(,) operator
1623 expressions inside - I do have a test case if somebody wants it
1625 if (info
->type
== SEQ_READ_APPEND
)
1626 return my_b_append(info
, Buffer
, Count
);
1627 return my_b_write(info
, Buffer
, Count
);
1632 Write a block to disk where part of the data may be inside the record
1633 buffer. As all write calls to the data goes through the cache,
1634 we will never get a seek over the end of the buffer
1637 int my_block_write(register IO_CACHE
*info
, const uchar
*Buffer
, size_t Count
,
1645 Assert that we cannot come here with a shared cache. If we do one
1646 day, we might need to add a call to copy_to_read_buffer().
1648 DBUG_ASSERT(!info
->share
);
1651 if (pos
< info
->pos_in_file
)
1653 /* Of no overlap, write everything without buffering */
1654 if (pos
+ Count
<= info
->pos_in_file
)
1655 return my_pwrite(info
->file
, Buffer
, Count
, pos
,
1656 info
->myflags
| MY_NABP
);
1657 /* Write the part of the block that is before buffer */
1658 length
= (uint
) (info
->pos_in_file
- pos
);
1659 if (my_pwrite(info
->file
, Buffer
, length
, pos
, info
->myflags
| MY_NABP
))
1660 info
->error
= error
= -1;
1665 info
->seek_not_done
=1;
1669 /* Check if we want to write inside the used part of the buffer.*/
1670 length
= (size_t) (info
->write_end
- info
->buffer
);
1671 if (pos
< info
->pos_in_file
+ length
)
1673 size_t offset
= (size_t) (pos
- info
->pos_in_file
);
1677 memcpy(info
->buffer
+offset
, Buffer
, length
);
1680 /* Fix length of buffer if the new data was larger */
1681 if (info
->buffer
+length
> info
->write_pos
)
1682 info
->write_pos
=info
->buffer
+length
;
1686 /* Write at the end of the current buffer; This is the normal case */
1687 if (_my_b_write(info
, Buffer
, Count
))
1693 /* Flush write cache */
1696 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1697 lock_append_buffer(info);
1698 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1699 unlock_append_buffer(info);
1701 #define LOCK_APPEND_BUFFER
1702 #define UNLOCK_APPEND_BUFFER
1706 int my_b_flush_io_cache(IO_CACHE
*info
,
1707 int need_append_buffer_lock
__attribute__((unused
)))
1710 my_off_t pos_in_file
;
1711 my_bool append_cache
= (info
->type
== SEQ_READ_APPEND
);
1712 DBUG_ENTER("my_b_flush_io_cache");
1713 DBUG_PRINT("enter", ("cache: 0x%lx", (long) info
));
1717 need_append_buffer_lock
= 0;
1720 if (info
->type
== WRITE_CACHE
|| append_cache
)
1722 if (info
->file
== -1)
1724 if (real_open_cached_file(info
))
1725 DBUG_RETURN((info
->error
= -1));
1729 if ((length
=(size_t) (info
->write_pos
- info
->write_buffer
)))
1733 In case of a shared I/O cache with a writer we do direct write
1734 cache to read cache copy. Do it before the write here so that
1735 the readers can work in parallel with the write.
1736 copy_to_read_buffer() relies on info->pos_in_file.
1739 copy_to_read_buffer(info
, info
->write_buffer
, length
);
1742 pos_in_file
=info
->pos_in_file
;
1744 If we have append cache, we always open the file with
1745 O_APPEND which moves the pos to EOF automatically on every write
1747 if (!append_cache
&& info
->seek_not_done
)
1748 { /* File touched, do seek */
1749 if (my_seek(info
->file
,pos_in_file
,MY_SEEK_SET
,MYF(0)) ==
1752 UNLOCK_APPEND_BUFFER
;
1753 DBUG_RETURN((info
->error
= -1));
1756 info
->seek_not_done
=0;
1759 info
->pos_in_file
+=length
;
1760 info
->write_end
= (info
->write_buffer
+info
->buffer_length
-
1761 ((pos_in_file
+length
) & (IO_SIZE
-1)));
1763 if (my_write(info
->file
,info
->write_buffer
,length
,
1764 info
->myflags
| MY_NABP
))
1770 set_if_bigger(info
->end_of_file
,(pos_in_file
+length
));
1774 info
->end_of_file
+=(info
->write_pos
-info
->append_read_pos
);
1775 DBUG_ASSERT(info
->end_of_file
== my_tell(info
->file
,MYF(0)));
1778 info
->append_read_pos
=info
->write_pos
=info
->write_buffer
;
1779 ++info
->disk_writes
;
1780 UNLOCK_APPEND_BUFFER
;
1781 DBUG_RETURN(info
->error
);
1785 else if (info
->type
!= READ_NET
)
1787 my_aiowait(&info
->aio_result
); /* Wait for outstanding req */
1791 UNLOCK_APPEND_BUFFER
;
1796 Free an IO_CACHE object
1800 info IO_CACHE Handle to free
1803 It's currently safe to call this if one has called init_io_cache()
1804 on the 'info' object, even if init_io_cache() failed.
1805 This function is also safe to call twice with the same handle.
1812 int end_io_cache(IO_CACHE
*info
)
1815 IO_CACHE_CALLBACK pre_close
;
1816 DBUG_ENTER("end_io_cache");
1817 DBUG_PRINT("enter",("cache: 0x%lx", (ulong
) info
));
1821 Every thread must call remove_io_thread(). The last one destroys
1824 DBUG_ASSERT(!info
->share
|| !info
->share
->total_threads
);
1827 if ((pre_close
=info
->pre_close
))
1832 if (info
->alloced_buffer
)
1834 info
->alloced_buffer
=0;
1835 if (info
->file
!= -1) /* File doesn't exist */
1836 error
= my_b_flush_io_cache(info
,1);
1837 my_free((uchar
*) info
->buffer
,MYF(MY_WME
));
1838 info
->buffer
=info
->read_pos
=(uchar
*) 0;
1840 if (info
->type
== SEQ_READ_APPEND
)
1842 /* Destroy allocated mutex */
1843 info
->type
= TYPE_NOT_SET
;
1845 pthread_mutex_destroy(&info
->append_buffer_lock
);
1849 } /* end_io_cache */
1852 /**********************************************************************
1853 Testing of MF_IOCACHE
1854 **********************************************************************/
1860 void die(const char* fmt
, ...)
1863 va_start(va_args
,fmt
);
1864 fprintf(stderr
,"Error:");
1865 vfprintf(stderr
, fmt
,va_args
);
1866 fprintf(stderr
,", errno=%d\n", errno
);
1870 int open_file(const char* fname
, IO_CACHE
* info
, int cache_size
)
1873 if ((fd
=my_open(fname
,O_CREAT
| O_RDWR
,MYF(MY_WME
))) < 0)
1874 die("Could not open %s", fname
);
1875 if (init_io_cache(info
, fd
, cache_size
, SEQ_READ_APPEND
, 0,0,MYF(MY_WME
)))
1876 die("failed in init_io_cache()");
1880 void close_file(IO_CACHE
* info
)
1883 my_close(info
->file
, MYF(MY_WME
));
1886 int main(int argc
, char** argv
)
1888 IO_CACHE sra_cache
; /* SEQ_READ_APPEND */
1890 const char* fname
="/tmp/iocache.test";
1891 int cache_size
=16384;
1893 int max_block
,total_bytes
=0;
1894 int i
,num_loops
=100,error
=0;
1896 char* block
, *block_end
;
1898 max_block
= cache_size
*3;
1899 if (!(block
=(char*)my_malloc(max_block
,MYF(MY_WME
))))
1900 die("Not enough memory to allocate test block");
1901 block_end
= block
+ max_block
;
1902 for (p
= block
,i
=0; p
< block_end
;i
++)
1906 if (my_stat(fname
,&status
, MYF(0)) &&
1907 my_delete(fname
,MYF(MY_WME
)))
1909 die("Delete of %s failed, aborting", fname
);
1911 open_file(fname
,&sra_cache
, cache_size
);
1912 for (i
= 0; i
< num_loops
; i
++)
1915 int block_size
= abs(rand() % max_block
);
1916 int4store(buf
, block_size
);
1917 if (my_b_append(&sra_cache
,buf
,4) ||
1918 my_b_append(&sra_cache
, block
, block_size
))
1919 die("write failed");
1920 total_bytes
+= 4+block_size
;
1922 close_file(&sra_cache
);
1923 my_free(block
,MYF(MY_WME
));
1924 if (!my_stat(fname
,&status
,MYF(MY_WME
)))
1925 die("%s failed to stat, but I had just closed it,\
1926 wonder how that happened");
1927 printf("Final size of %s is %s, wrote %d bytes\n",fname
,
1928 llstr(status
.st_size
,llstr_buf
),
1930 my_delete(fname
, MYF(MY_WME
));
1931 /* check correctness of tests */
1932 if (total_bytes
!= status
.st_size
)
1934 fprintf(stderr
,"Not the same number of bytes acutally in file as bytes \
1935 supposedly written\n");