mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / mysys / mf_iocache.c
blob82dac7e0956aecb084079396ec3694b522ca471d
1 /*
2 Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 Cashing of files with only does (sequential) read or writes of fixed-
20 length records. A read isn't allowed to go over file-length. A read is ok
21 if it ends at file-length and next read can try to read after file-length
22 (and get a EOF-error).
23 Possibly use of asyncronic io.
24 macros for read and writes for faster io.
25 Used instead of FILE when reading or writing whole files.
26 This code makes mf_rec_cache obsolete (currently only used by ISAM)
27 One can change info->pos_in_file to a higher value to skip bytes in file if
28 also info->read_pos is set to info->read_end.
29 If called through open_cached_file(), then the temporary file will
30 only be created if a write exeeds the file buffer or if one calls
31 my_b_flush_io_cache().
33 If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
34 reading and another for writing. Reads are first done from disk and
35 then done from the write buffer. This is an efficient way to read
36 from a log file when one is writing to it at the same time.
37 For this to work, the file has to be opened in append mode!
38 Note that when one uses SEQ_READ_APPEND, one MUST write using
39 my_b_append ! This is needed because we need to lock the mutex
40 every time we access the write buffer.
42 TODO:
43 When one SEQ_READ_APPEND and we are reading and writing at the same time,
44 each time the write buffer gets full and it's written to disk, we will
45 always do a disk read to read a part of the buffer from disk to the
46 read buffer.
47 This should be fixed so that when we do a my_b_flush_io_cache() and
48 we have been reading the write buffer, we should transfer the rest of the
49 write buffer to the read buffer before we start to reuse it.
52 #define MAP_TO_USE_RAID
53 #include "mysys_priv.h"
54 #include <m_string.h>
55 #ifdef HAVE_AIOWAIT
56 #include "mysys_err.h"
57 static void my_aiowait(my_aio_result *result);
58 #endif
59 #include <errno.h>
61 #ifdef THREAD
62 #define lock_append_buffer(info) \
63 pthread_mutex_lock(&(info)->append_buffer_lock)
64 #define unlock_append_buffer(info) \
65 pthread_mutex_unlock(&(info)->append_buffer_lock)
66 #else
67 #define lock_append_buffer(info)
68 #define unlock_append_buffer(info)
69 #endif
71 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
72 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
75 Setup internal pointers inside IO_CACHE
77 SYNOPSIS
78 setup_io_cache()
79 info IO_CACHE handler
81 NOTES
82 This is called on automaticly on init or reinit of IO_CACHE
83 It must be called externally if one moves or copies an IO_CACHE
84 object.
87 void setup_io_cache(IO_CACHE* info)
89 /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
90 if (info->type == WRITE_CACHE)
92 info->current_pos= &info->write_pos;
93 info->current_end= &info->write_end;
95 else
97 info->current_pos= &info->read_pos;
98 info->current_end= &info->read_end;
103 static void
104 init_functions(IO_CACHE* info)
106 enum cache_type type= info->type;
107 switch (type) {
108 case READ_NET:
110 Must be initialized by the caller. The problem is that
111 _my_b_net_read has to be defined in sql directory because of
112 the dependency on THD, and therefore cannot be visible to
113 programs that link against mysys but know nothing about THD, such
114 as myisamchk
116 break;
117 case SEQ_READ_APPEND:
118 info->read_function = _my_b_seq_read;
119 info->write_function = 0; /* Force a core if used */
120 break;
121 default:
122 info->read_function =
123 #ifdef THREAD
124 info->share ? _my_b_read_r :
125 #endif
126 _my_b_read;
127 info->write_function = _my_b_write;
130 setup_io_cache(info);
135 Initialize an IO_CACHE object
137 SYNOPSOS
138 init_io_cache()
139 info cache handler to initialize
140 file File that should be associated to to the handler
141 If == -1 then real_open_cached_file()
142 will be called when it's time to open file.
143 cachesize Size of buffer to allocate for read/write
144 If == 0 then use my_default_record_cache_size
145 type Type of cache
146 seek_offset Where cache should start reading/writing
147 use_async_io Set to 1 of we should use async_io (if avaiable)
148 cache_myflags Bitmap of differnt flags
149 MY_WME | MY_FAE | MY_NABP | MY_FNABP |
150 MY_DONT_CHECK_FILESIZE
152 RETURN
153 0 ok
154 # error
157 int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
158 enum cache_type type, my_off_t seek_offset,
159 pbool use_async_io, myf cache_myflags)
161 size_t min_cache;
162 my_off_t pos;
163 my_off_t end_of_file= ~(my_off_t) 0;
164 DBUG_ENTER("init_io_cache");
165 DBUG_PRINT("enter",("cache: 0x%lx type: %d pos: %ld",
166 (ulong) info, (int) type, (ulong) seek_offset));
168 info->file= file;
169 info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
170 info->pos_in_file= seek_offset;
171 info->pre_close = info->pre_read = info->post_read = 0;
172 info->arg = 0;
173 info->alloced_buffer = 0;
174 info->buffer=0;
175 info->seek_not_done= 0;
177 if (file >= 0)
179 pos= my_tell(file, MYF(0));
180 if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
183 This kind of object doesn't support seek() or tell(). Don't set a
184 flag that will make us again try to seek() later and fail.
186 info->seek_not_done= 0;
188 Additionally, if we're supposed to start somewhere other than the
189 the beginning of whatever this file is, then somebody made a bad
190 assumption.
192 DBUG_ASSERT(seek_offset == 0);
194 else
195 info->seek_not_done= test(seek_offset != pos);
198 info->disk_writes= 0;
199 #ifdef THREAD
200 info->share=0;
201 #endif
203 if (!cachesize && !(cachesize= my_default_record_cache_size))
204 DBUG_RETURN(1); /* No cache requested */
205 min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
206 if (type == READ_CACHE || type == SEQ_READ_APPEND)
207 { /* Assume file isn't growing */
208 if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
210 /* Calculate end of file to avoid allocating oversized buffers */
211 end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
212 /* Need to reset seek_not_done now that we just did a seek. */
213 info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
214 if (end_of_file < seek_offset)
215 end_of_file=seek_offset;
216 /* Trim cache size if the file is very small */
217 if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
219 cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
220 use_async_io=0; /* No need to use async */
224 cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
225 if (type != READ_NET && type != WRITE_NET)
227 /* Retry allocating memory in smaller blocks until we get one */
228 cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
229 for (;;)
231 size_t buffer_block;
233 Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
234 MY_ZEROFILL.
236 myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
238 if (cachesize < min_cache)
239 cachesize = min_cache;
240 buffer_block= cachesize;
241 if (type == SEQ_READ_APPEND)
242 buffer_block *= 2;
243 if (cachesize == min_cache)
244 flags|= (myf) MY_WME;
246 if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
248 info->write_buffer=info->buffer;
249 if (type == SEQ_READ_APPEND)
250 info->write_buffer = info->buffer + cachesize;
251 info->alloced_buffer=1;
252 break; /* Enough memory found */
254 if (cachesize == min_cache)
255 DBUG_RETURN(2); /* Can't alloc cache */
256 /* Try with less memory */
257 cachesize= (cachesize*3/4 & ~(min_cache-1));
261 DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
262 info->read_length=info->buffer_length=cachesize;
263 info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
264 info->request_pos= info->read_pos= info->write_pos = info->buffer;
265 if (type == SEQ_READ_APPEND)
267 info->append_read_pos = info->write_pos = info->write_buffer;
268 info->write_end = info->write_buffer + info->buffer_length;
269 #ifdef THREAD
270 pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
271 #endif
273 #if defined(SAFE_MUTEX) && defined(THREAD)
274 else
276 /* Clear mutex so that safe_mutex will notice that it's not initialized */
277 bzero((char*) &info->append_buffer_lock, sizeof(info));
279 #endif
281 if (type == WRITE_CACHE)
282 info->write_end=
283 info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
284 else
285 info->read_end=info->buffer; /* Nothing in cache */
287 /* End_of_file may be changed by user later */
288 info->end_of_file= end_of_file;
289 info->error=0;
290 info->type= type;
291 init_functions(info);
292 #ifdef HAVE_AIOWAIT
293 if (use_async_io && ! my_disable_async_io)
295 DBUG_PRINT("info",("Using async io"));
296 info->read_length/=2;
297 info->read_function=_my_b_async_read;
299 info->inited=info->aio_result.pending=0;
300 #endif
301 DBUG_RETURN(0);
302 } /* init_io_cache */
304 /* Wait until current request is ready */
306 #ifdef HAVE_AIOWAIT
307 static void my_aiowait(my_aio_result *result)
309 if (result->pending)
311 struct aio_result_t *tmp;
312 for (;;)
314 if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
316 if (errno == EINTR)
317 continue;
318 DBUG_PRINT("error",("No aio request, error: %d",errno));
319 result->pending=0; /* Assume everythings is ok */
320 break;
322 ((my_aio_result*) tmp)->pending=0;
323 if ((my_aio_result*) tmp == result)
324 break;
327 return;
329 #endif
333 Use this to reset cache to re-start reading or to change the type
334 between READ_CACHE <-> WRITE_CACHE
335 If we are doing a reinit of a cache where we have the start of the file
336 in the cache, we are reusing this memory without flushing it to disk.
339 my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
340 my_off_t seek_offset,
341 pbool use_async_io __attribute__((unused)),
342 pbool clear_cache)
344 DBUG_ENTER("reinit_io_cache");
345 DBUG_PRINT("enter",("cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
346 (ulong) info, type, (ulong) seek_offset,
347 (int) clear_cache));
349 /* One can't do reinit with the following types */
350 DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
351 type != WRITE_NET && info->type != WRITE_NET &&
352 type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
354 /* If the whole file is in memory, avoid flushing to disk */
355 if (! clear_cache &&
356 seek_offset >= info->pos_in_file &&
357 seek_offset <= my_b_tell(info))
359 /* Reuse current buffer without flushing it to disk */
360 uchar *pos;
361 if (info->type == WRITE_CACHE && type == READ_CACHE)
363 info->read_end=info->write_pos;
364 info->end_of_file=my_b_tell(info);
366 Trigger a new seek only if we have a valid
367 file handle.
369 info->seek_not_done= (info->file != -1);
371 else if (type == WRITE_CACHE)
373 if (info->type == READ_CACHE)
375 info->write_end=info->write_buffer+info->buffer_length;
376 info->seek_not_done=1;
378 info->end_of_file = ~(my_off_t) 0;
380 pos=info->request_pos+(seek_offset-info->pos_in_file);
381 if (type == WRITE_CACHE)
382 info->write_pos=pos;
383 else
384 info->read_pos= pos;
385 #ifdef HAVE_AIOWAIT
386 my_aiowait(&info->aio_result); /* Wait for outstanding req */
387 #endif
389 else
392 If we change from WRITE_CACHE to READ_CACHE, assume that everything
393 after the current positions should be ignored
395 if (info->type == WRITE_CACHE && type == READ_CACHE)
396 info->end_of_file=my_b_tell(info);
397 /* flush cache if we want to reuse it */
398 if (!clear_cache && my_b_flush_io_cache(info,1))
399 DBUG_RETURN(1);
400 info->pos_in_file=seek_offset;
401 /* Better to do always do a seek */
402 info->seek_not_done=1;
403 info->request_pos=info->read_pos=info->write_pos=info->buffer;
404 if (type == READ_CACHE)
406 info->read_end=info->buffer; /* Nothing in cache */
408 else
410 info->write_end=(info->buffer + info->buffer_length -
411 (seek_offset & (IO_SIZE-1)));
412 info->end_of_file= ~(my_off_t) 0;
415 info->type=type;
416 info->error=0;
417 init_functions(info);
419 #ifdef HAVE_AIOWAIT
420 if (use_async_io && ! my_disable_async_io &&
421 ((ulong) info->buffer_length <
422 (ulong) (info->end_of_file - seek_offset)))
424 info->read_length=info->buffer_length/2;
425 info->read_function=_my_b_async_read;
427 info->inited=0;
428 #endif
429 DBUG_RETURN(0);
430 } /* reinit_io_cache */
435 Read buffered.
437 SYNOPSIS
438 _my_b_read()
439 info IO_CACHE pointer
440 Buffer Buffer to retrieve count bytes from file
441 Count Number of bytes to read into Buffer
443 NOTE
444 This function is only called from the my_b_read() macro when there
445 isn't enough characters in the buffer to satisfy the request.
447 WARNING
449 When changing this function, be careful with handling file offsets
450 (end-of_file, pos_in_file). Do not cast them to possibly smaller
451 types than my_off_t unless you can be sure that their value fits.
452 Same applies to differences of file offsets.
454 When changing this function, check _my_b_read_r(). It might need the
455 same change.
457 RETURN
458 0 we succeeded in reading all data
459 1 Error: can't read requested characters
462 int _my_b_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
464 size_t length,diff_length,left_length, max_length;
465 my_off_t pos_in_file;
466 DBUG_ENTER("_my_b_read");
468 if ((left_length= (size_t) (info->read_end-info->read_pos)))
470 DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
471 memcpy(Buffer,info->read_pos, left_length);
472 Buffer+=left_length;
473 Count-=left_length;
476 /* pos_in_file always point on where info->buffer was read */
477 pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
480 Whenever a function which operates on IO_CACHE flushes/writes
481 some part of the IO_CACHE to disk it will set the property
482 "seek_not_done" to indicate this to other functions operating
483 on the IO_CACHE.
485 if (info->seek_not_done)
487 if ((my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))
488 != MY_FILEPOS_ERROR))
490 /* No error, reset seek_not_done flag. */
491 info->seek_not_done= 0;
493 else
496 If the seek failed and the error number is ESPIPE, it is because
497 info->file is a pipe or socket or FIFO. We never should have tried
498 to seek on that. See Bugs#25807 and #22828 for more info.
500 DBUG_ASSERT(my_errno != ESPIPE);
501 info->error= -1;
502 DBUG_RETURN(1);
506 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
507 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
508 { /* Fill first intern buffer */
509 size_t read_length;
510 if (info->end_of_file <= pos_in_file)
511 { /* End of file */
512 info->error= (int) left_length;
513 DBUG_RETURN(1);
515 length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
516 if ((read_length= my_read(info->file,Buffer, length, info->myflags))
517 != length)
519 info->error= (read_length == (size_t) -1 ? -1 :
520 (int) (read_length+left_length));
521 DBUG_RETURN(1);
523 Count-=length;
524 Buffer+=length;
525 pos_in_file+=length;
526 left_length+=length;
527 diff_length=0;
530 max_length= info->read_length-diff_length;
531 if (info->type != READ_FIFO &&
532 max_length > (info->end_of_file - pos_in_file))
533 max_length= (size_t) (info->end_of_file - pos_in_file);
534 if (!max_length)
536 if (Count)
538 info->error= left_length; /* We only got this many char */
539 DBUG_RETURN(1);
541 length=0; /* Didn't read any chars */
543 else if ((length= my_read(info->file,info->buffer, max_length,
544 info->myflags)) < Count ||
545 length == (size_t) -1)
547 if (length != (size_t) -1)
548 memcpy(Buffer, info->buffer, length);
549 info->pos_in_file= pos_in_file;
550 info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
551 info->read_pos=info->read_end=info->buffer;
552 DBUG_RETURN(1);
554 info->read_pos=info->buffer+Count;
555 info->read_end=info->buffer+length;
556 info->pos_in_file=pos_in_file;
557 memcpy(Buffer, info->buffer, Count);
558 DBUG_RETURN(0);
562 #ifdef THREAD
564 Prepare IO_CACHE for shared use.
566 SYNOPSIS
567 init_io_cache_share()
568 read_cache A read cache. This will be copied for
569 every thread after setup.
570 cshare The share.
571 write_cache If non-NULL a write cache that is to be
572 synchronized with the read caches.
573 num_threads Number of threads sharing the cache
574 including the write thread if any.
576 DESCRIPTION
578 The shared cache is used so: One IO_CACHE is initialized with
579 init_io_cache(). This includes the allocation of a buffer. Then a
580 share is allocated and init_io_cache_share() is called with the io
581 cache and the share. Then the io cache is copied for each thread. So
582 every thread has its own copy of IO_CACHE. But the allocated buffer
583 is shared because cache->buffer is the same for all caches.
585 One thread reads data from the file into the buffer. All threads
586 read from the buffer, but every thread maintains its own set of
587 pointers into the buffer. When all threads have used up the buffer
588 contents, one of the threads reads the next block of data into the
589 buffer. To accomplish this, each thread enters the cache lock before
590 accessing the buffer. They wait in lock_io_cache() until all threads
591 joined the lock. The last thread entering the lock is in charge of
592 reading from file to buffer. It wakes all threads when done.
594 Synchronizing a write cache to the read caches works so: Whenever
595 the write buffer needs a flush, the write thread enters the lock and
596 waits for all other threads to enter the lock too. They do this when
597 they have used up the read buffer. When all threads are in the lock,
598 the write thread copies the write buffer to the read buffer and
599 wakes all threads.
601 share->running_threads is the number of threads not being in the
602 cache lock. When entering lock_io_cache() the number is decreased.
603 When the thread that fills the buffer enters unlock_io_cache() the
604 number is reset to the number of threads. The condition
605 running_threads == 0 means that all threads are in the lock. Bumping
606 up the number to the full count is non-intuitive. But increasing the
607 number by one for each thread that leaves the lock could lead to a
608 solo run of one thread. The last thread to join a lock reads from
609 file to buffer, wakes the other threads, processes the data in the
610 cache and enters the lock again. If no other thread left the lock
611 meanwhile, it would think it's the last one again and read the next
612 block...
614 The share has copies of 'error', 'buffer', 'read_end', and
615 'pos_in_file' from the thread that filled the buffer. We may not be
616 able to access this information directly from its cache because the
617 thread may be removed from the share before the variables could be
618 copied by all other threads. Or, if a write buffer is synchronized,
619 it would change its 'pos_in_file' after waking the other threads,
620 possibly before they could copy its value.
622 However, the 'buffer' variable in the share is for a synchronized
623 write cache. It needs to know where to put the data. Otherwise it
624 would need access to the read cache of one of the threads that is
625 not yet removed from the share.
627 RETURN
628 void
631 void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
632 IO_CACHE *write_cache, uint num_threads)
634 DBUG_ENTER("init_io_cache_share");
635 DBUG_PRINT("io_cache_share", ("read_cache: 0x%lx share: 0x%lx "
636 "write_cache: 0x%lx threads: %u",
637 (long) read_cache, (long) cshare,
638 (long) write_cache, num_threads));
640 DBUG_ASSERT(num_threads > 1);
641 DBUG_ASSERT(read_cache->type == READ_CACHE);
642 DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
644 pthread_mutex_init(&cshare->mutex, MY_MUTEX_INIT_FAST);
645 pthread_cond_init(&cshare->cond, 0);
646 pthread_cond_init(&cshare->cond_writer, 0);
648 cshare->running_threads= num_threads;
649 cshare->total_threads= num_threads;
650 cshare->error= 0; /* Initialize. */
651 cshare->buffer= read_cache->buffer;
652 cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
653 cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
654 cshare->source_cache= write_cache; /* Can be NULL. */
656 read_cache->share= cshare;
657 read_cache->read_function= _my_b_read_r;
658 read_cache->current_pos= NULL;
659 read_cache->current_end= NULL;
661 if (write_cache)
662 write_cache->share= cshare;
664 DBUG_VOID_RETURN;
669 Remove a thread from shared access to IO_CACHE.
671 SYNOPSIS
672 remove_io_thread()
673 cache The IO_CACHE to be removed from the share.
675 NOTE
677 Every thread must do that on exit for not to deadlock other threads.
679 The last thread destroys the pthread resources.
681 A writer flushes its cache first.
683 RETURN
684 void
687 void remove_io_thread(IO_CACHE *cache)
689 IO_CACHE_SHARE *cshare= cache->share;
690 uint total;
691 DBUG_ENTER("remove_io_thread");
693 /* If the writer goes, it needs to flush the write cache. */
694 if (cache == cshare->source_cache)
695 flush_io_cache(cache);
697 pthread_mutex_lock(&cshare->mutex);
698 DBUG_PRINT("io_cache_share", ("%s: 0x%lx",
699 (cache == cshare->source_cache) ?
700 "writer" : "reader", (long) cache));
702 /* Remove from share. */
703 total= --cshare->total_threads;
704 DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
706 /* Detach from share. */
707 cache->share= NULL;
709 /* If the writer goes, let the readers know. */
710 if (cache == cshare->source_cache)
712 DBUG_PRINT("io_cache_share", ("writer leaves"));
713 cshare->source_cache= NULL;
716 /* If all threads are waiting for me to join the lock, wake them. */
717 if (!--cshare->running_threads)
719 DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
720 pthread_cond_signal(&cshare->cond_writer);
721 pthread_cond_broadcast(&cshare->cond);
724 pthread_mutex_unlock(&cshare->mutex);
726 if (!total)
728 DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
729 pthread_cond_destroy (&cshare->cond_writer);
730 pthread_cond_destroy (&cshare->cond);
731 pthread_mutex_destroy(&cshare->mutex);
734 DBUG_VOID_RETURN;
739 Lock IO cache and wait for all other threads to join.
741 SYNOPSIS
742 lock_io_cache()
743 cache The cache of the thread entering the lock.
744 pos File position of the block to read.
745 Unused for the write thread.
747 DESCRIPTION
749 Wait for all threads to finish with the current buffer. We want
750 all threads to proceed in concert. The last thread to join
751 lock_io_cache() will read the block from file and all threads start
752 to use it. Then they will join again for reading the next block.
754 The waiting threads detect a fresh buffer by comparing
755 cshare->pos_in_file with the position they want to process next.
756 Since the first block may start at position 0, we take
757 cshare->read_end as an additional condition. This variable is
758 initialized to NULL and will be set after a block of data is written
759 to the buffer.
761 RETURN
762 1 OK, lock in place, go ahead and read.
763 0 OK, unlocked, another thread did the read.
766 static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
768 IO_CACHE_SHARE *cshare= cache->share;
769 DBUG_ENTER("lock_io_cache");
771 /* Enter the lock. */
772 pthread_mutex_lock(&cshare->mutex);
773 cshare->running_threads--;
774 DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
775 (cache == cshare->source_cache) ?
776 "writer" : "reader", (long) cache, (ulong) pos,
777 cshare->running_threads));
779 if (cshare->source_cache)
781 /* A write cache is synchronized to the read caches. */
783 if (cache == cshare->source_cache)
785 /* The writer waits until all readers are here. */
786 while (cshare->running_threads)
788 DBUG_PRINT("io_cache_share", ("writer waits in lock"));
789 pthread_cond_wait(&cshare->cond_writer, &cshare->mutex);
791 DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
793 /* Stay locked. Leave the lock later by unlock_io_cache(). */
794 DBUG_RETURN(1);
797 /* The last thread wakes the writer. */
798 if (!cshare->running_threads)
800 DBUG_PRINT("io_cache_share", ("waking writer"));
801 pthread_cond_signal(&cshare->cond_writer);
805 Readers wait until the data is copied from the writer. Another
806 reason to stop waiting is the removal of the write thread. If this
807 happens, we leave the lock with old data in the buffer.
809 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
810 cshare->source_cache)
812 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
813 pthread_cond_wait(&cshare->cond, &cshare->mutex);
817 If the writer was removed from the share while this thread was
818 asleep, we need to simulate an EOF condition. The writer cannot
819 reset the share variables as they might still be in use by readers
820 of the last block. When we awake here then because the last
821 joining thread signalled us. If the writer is not the last, it
822 will not signal. So it is safe to clear the buffer here.
824 if (!cshare->read_end || (cshare->pos_in_file < pos))
826 DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
827 cshare->read_end= cshare->buffer; /* Empty buffer. */
828 cshare->error= 0; /* EOF is not an error. */
831 else
834 There are read caches only. The last thread arriving in
835 lock_io_cache() continues with a locked cache and reads the block.
837 if (!cshare->running_threads)
839 DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
840 /* Stay locked. Leave the lock later by unlock_io_cache(). */
841 DBUG_RETURN(1);
845 All other threads wait until the requested block is read by the
846 last thread arriving. Another reason to stop waiting is the
847 removal of a thread. If this leads to all threads being in the
848 lock, we have to continue also. The first of the awaken threads
849 will then do the read.
851 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
852 cshare->running_threads)
854 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
855 pthread_cond_wait(&cshare->cond, &cshare->mutex);
858 /* If the block is not yet read, continue with a locked cache and read. */
859 if (!cshare->read_end || (cshare->pos_in_file < pos))
861 DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
862 /* Stay locked. Leave the lock later by unlock_io_cache(). */
863 DBUG_RETURN(1);
866 /* Another thread did read the block already. */
868 DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
869 (uint) (cshare->read_end ? (size_t)
870 (cshare->read_end - cshare->buffer) :
871 0)));
874 Leave the lock. Do not call unlock_io_cache() later. The thread that
875 filled the buffer did this and marked all threads as running.
877 pthread_mutex_unlock(&cshare->mutex);
878 DBUG_RETURN(0);
883 Unlock IO cache.
885 SYNOPSIS
886 unlock_io_cache()
887 cache The cache of the thread leaving the lock.
889 NOTE
890 This is called by the thread that filled the buffer. It marks all
891 threads as running and awakes them. This must not be done by any
892 other thread.
894 Do not signal cond_writer. Either there is no writer or the writer
895 is the only one who can call this function.
897 The reason for resetting running_threads to total_threads before
898 waking all other threads is that it could be possible that this
899 thread is so fast with processing the buffer that it enters the lock
900 before even one other thread has left it. If every awoken thread
901 would increase running_threads by one, this thread could think that
902 he is again the last to join and would not wait for the other
903 threads to process the data.
905 RETURN
906 void
909 static void unlock_io_cache(IO_CACHE *cache)
911 IO_CACHE_SHARE *cshare= cache->share;
912 DBUG_ENTER("unlock_io_cache");
913 DBUG_PRINT("io_cache_share", ("%s: 0x%lx pos: %lu running: %u",
914 (cache == cshare->source_cache) ?
915 "writer" : "reader",
916 (long) cache, (ulong) cshare->pos_in_file,
917 cshare->total_threads));
919 cshare->running_threads= cshare->total_threads;
920 pthread_cond_broadcast(&cshare->cond);
921 pthread_mutex_unlock(&cshare->mutex);
922 DBUG_VOID_RETURN;
927 Read from IO_CACHE when it is shared between several threads.
929 SYNOPSIS
930 _my_b_read_r()
931 cache IO_CACHE pointer
932 Buffer Buffer to retrieve count bytes from file
933 Count Number of bytes to read into Buffer
935 NOTE
936 This function is only called from the my_b_read() macro when there
937 isn't enough characters in the buffer to satisfy the request.
939 IMPLEMENTATION
941 It works as follows: when a thread tries to read from a file (that
942 is, after using all the data from the (shared) buffer), it just
943 hangs on lock_io_cache(), waiting for other threads. When the very
944 last thread attempts a read, lock_io_cache() returns 1, the thread
945 does actual IO and unlock_io_cache(), which signals all the waiting
946 threads that data is in the buffer.
948 WARNING
950 When changing this function, be careful with handling file offsets
951 (end-of_file, pos_in_file). Do not cast them to possibly smaller
952 types than my_off_t unless you can be sure that their value fits.
953 Same applies to differences of file offsets. (Bug #11527)
955 When changing this function, check _my_b_read(). It might need the
956 same change.
958 RETURN
959 0 we succeeded in reading all data
960 1 Error: can't read requested characters
963 int _my_b_read_r(register IO_CACHE *cache, uchar *Buffer, size_t Count)
965 my_off_t pos_in_file;
966 size_t length, diff_length, left_length;
967 IO_CACHE_SHARE *cshare= cache->share;
968 DBUG_ENTER("_my_b_read_r");
970 if ((left_length= (size_t) (cache->read_end - cache->read_pos)))
972 DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
973 memcpy(Buffer, cache->read_pos, left_length);
974 Buffer+= left_length;
975 Count-= left_length;
977 while (Count)
979 size_t cnt, len;
981 pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
982 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
983 length=IO_ROUND_UP(Count+diff_length)-diff_length;
984 length= ((length <= cache->read_length) ?
985 length + IO_ROUND_DN(cache->read_length - length) :
986 length - IO_ROUND_UP(length - cache->read_length));
987 if (cache->type != READ_FIFO &&
988 (length > (cache->end_of_file - pos_in_file)))
989 length= (size_t) (cache->end_of_file - pos_in_file);
990 if (length == 0)
992 cache->error= (int) left_length;
993 DBUG_RETURN(1);
995 if (lock_io_cache(cache, pos_in_file))
997 /* With a synchronized write/read cache we won't come here... */
998 DBUG_ASSERT(!cshare->source_cache);
1000 ... unless the writer has gone before this thread entered the
1001 lock. Simulate EOF in this case. It can be distinguished by
1002 cache->file.
1004 if (cache->file < 0)
1005 len= 0;
1006 else
1009 Whenever a function which operates on IO_CACHE flushes/writes
1010 some part of the IO_CACHE to disk it will set the property
1011 "seek_not_done" to indicate this to other functions operating
1012 on the IO_CACHE.
1014 if (cache->seek_not_done)
1016 if (my_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1017 == MY_FILEPOS_ERROR)
1019 cache->error= -1;
1020 unlock_io_cache(cache);
1021 DBUG_RETURN(1);
1024 len= my_read(cache->file, cache->buffer, length, cache->myflags);
1026 DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1028 cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1029 cache->error= (len == length ? 0 : (int) len);
1030 cache->pos_in_file= pos_in_file;
1032 /* Copy important values to the share. */
1033 cshare->error= cache->error;
1034 cshare->read_end= cache->read_end;
1035 cshare->pos_in_file= pos_in_file;
1037 /* Mark all threads as running and wake them. */
1038 unlock_io_cache(cache);
1040 else
1043 With a synchronized write/read cache readers always come here.
1044 Copy important values from the share.
1046 cache->error= cshare->error;
1047 cache->read_end= cshare->read_end;
1048 cache->pos_in_file= cshare->pos_in_file;
1050 len= ((cache->error == -1) ? (size_t) -1 :
1051 (size_t) (cache->read_end - cache->buffer));
1053 cache->read_pos= cache->buffer;
1054 cache->seek_not_done= 0;
1055 if (len == 0 || len == (size_t) -1)
1057 DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1058 (ulong) len, (ulong) left_length));
1059 cache->error= (int) left_length;
1060 DBUG_RETURN(1);
1062 cnt= (len > Count) ? Count : len;
1063 memcpy(Buffer, cache->read_pos, cnt);
1064 Count -= cnt;
1065 Buffer+= cnt;
1066 left_length+= cnt;
1067 cache->read_pos+= cnt;
1069 DBUG_RETURN(0);
1074 Copy data from write cache to read cache.
1076 SYNOPSIS
1077 copy_to_read_buffer()
1078 write_cache The write cache.
1079 write_buffer The source of data, mostly the cache buffer.
1080 write_length The number of bytes to copy.
1082 NOTE
1083 The write thread will wait for all read threads to join the cache
1084 lock. Then it copies the data over and wakes the read threads.
1086 RETURN
1087 void
1090 static void copy_to_read_buffer(IO_CACHE *write_cache,
1091 const uchar *write_buffer, size_t write_length)
1093 IO_CACHE_SHARE *cshare= write_cache->share;
1095 DBUG_ASSERT(cshare->source_cache == write_cache);
1097 write_length is usually less or equal to buffer_length.
1098 It can be bigger if _my_b_write() is called with a big length.
1100 while (write_length)
1102 size_t copy_length= min(write_length, write_cache->buffer_length);
1103 int __attribute__((unused)) rc;
1105 rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1106 /* The writing thread does always have the lock when it awakes. */
1107 DBUG_ASSERT(rc);
1109 memcpy(cshare->buffer, write_buffer, copy_length);
1111 cshare->error= 0;
1112 cshare->read_end= cshare->buffer + copy_length;
1113 cshare->pos_in_file= write_cache->pos_in_file;
1115 /* Mark all threads as running and wake them. */
1116 unlock_io_cache(write_cache);
1118 write_buffer+= copy_length;
1119 write_length-= copy_length;
1122 #endif /*THREAD*/
1126 Do sequential read from the SEQ_READ_APPEND cache.
1128 We do this in three stages:
1129 - first read from info->buffer
1130 - then if there are still data to read, try the file descriptor
1131 - afterwards, if there are still data to read, try append buffer
1133 RETURNS
1134 0 Success
1135 1 Failed to read
1138 int _my_b_seq_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1140 size_t length, diff_length, left_length, save_count, max_length;
1141 my_off_t pos_in_file;
1142 save_count=Count;
1144 /* first, read the regular buffer */
1145 if ((left_length=(size_t) (info->read_end-info->read_pos)))
1147 DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
1148 memcpy(Buffer,info->read_pos, left_length);
1149 Buffer+=left_length;
1150 Count-=left_length;
1152 lock_append_buffer(info);
1154 /* pos_in_file always point on where info->buffer was read */
1155 if ((pos_in_file=info->pos_in_file +
1156 (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1157 goto read_append_buffer;
1160 With read-append cache we must always do a seek before we read,
1161 because the write could have moved the file pointer astray
1163 if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR)
1165 info->error= -1;
1166 unlock_append_buffer(info);
1167 return (1);
1169 info->seek_not_done=0;
1171 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1173 /* now the second stage begins - read from file descriptor */
1174 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1176 /* Fill first intern buffer */
1177 size_t read_length;
1179 length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1180 if ((read_length= my_read(info->file,Buffer, length,
1181 info->myflags)) == (size_t) -1)
1183 info->error= -1;
1184 unlock_append_buffer(info);
1185 return 1;
1187 Count-=read_length;
1188 Buffer+=read_length;
1189 pos_in_file+=read_length;
1191 if (read_length != length)
1194 We only got part of data; Read the rest of the data from the
1195 write buffer
1197 goto read_append_buffer;
1199 left_length+=length;
1200 diff_length=0;
1203 max_length= info->read_length-diff_length;
1204 if (max_length > (info->end_of_file - pos_in_file))
1205 max_length= (size_t) (info->end_of_file - pos_in_file);
1206 if (!max_length)
1208 if (Count)
1209 goto read_append_buffer;
1210 length=0; /* Didn't read any more chars */
1212 else
1214 length= my_read(info->file,info->buffer, max_length, info->myflags);
1215 if (length == (size_t) -1)
1217 info->error= -1;
1218 unlock_append_buffer(info);
1219 return 1;
1221 if (length < Count)
1223 memcpy(Buffer, info->buffer, length);
1224 Count -= length;
1225 Buffer += length;
1228 added the line below to make
1229 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1230 otherwise this does not appear to be needed
1232 pos_in_file += length;
1233 goto read_append_buffer;
1236 unlock_append_buffer(info);
1237 info->read_pos=info->buffer+Count;
1238 info->read_end=info->buffer+length;
1239 info->pos_in_file=pos_in_file;
1240 memcpy(Buffer,info->buffer,(size_t) Count);
1241 return 0;
1243 read_append_buffer:
1246 Read data from the current write buffer.
1247 Count should never be == 0 here (The code will work even if count is 0)
1251 /* First copy the data to Count */
1252 size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1253 size_t copy_len;
1254 size_t transfer_len;
1256 DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1258 TODO: figure out if the assert below is needed or correct.
1260 DBUG_ASSERT(pos_in_file == info->end_of_file);
1261 copy_len=min(Count, len_in_buff);
1262 memcpy(Buffer, info->append_read_pos, copy_len);
1263 info->append_read_pos += copy_len;
1264 Count -= copy_len;
1265 if (Count)
1266 info->error = save_count - Count;
1268 /* Fill read buffer with data from write buffer */
1269 memcpy(info->buffer, info->append_read_pos,
1270 (size_t) (transfer_len=len_in_buff - copy_len));
1271 info->read_pos= info->buffer;
1272 info->read_end= info->buffer+transfer_len;
1273 info->append_read_pos=info->write_pos;
1274 info->pos_in_file=pos_in_file+copy_len;
1275 info->end_of_file+=len_in_buff;
1277 unlock_append_buffer(info);
1278 return Count ? 1 : 0;
1282 #ifdef HAVE_AIOWAIT
1285 Read from the IO_CACHE into a buffer and feed asynchronously
1286 from disk when needed.
1288 SYNOPSIS
1289 _my_b_async_read()
1290 info IO_CACHE pointer
1291 Buffer Buffer to retrieve count bytes from file
1292 Count Number of bytes to read into Buffer
1294 RETURN VALUE
1295 -1 An error has occurred; my_errno is set.
1296 0 Success
1297 1 An error has occurred; IO_CACHE to error state.
1300 int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
1302 size_t length,read_length,diff_length,left_length,use_length,org_Count;
1303 size_t max_length;
1304 my_off_t next_pos_in_file;
1305 uchar *read_buffer;
1307 memcpy(Buffer,info->read_pos,
1308 (left_length= (size_t) (info->read_end-info->read_pos)));
1309 Buffer+=left_length;
1310 org_Count=Count;
1311 Count-=left_length;
1313 if (info->inited)
1314 { /* wait for read block */
1315 info->inited=0; /* No more block to read */
1316 my_aiowait(&info->aio_result); /* Wait for outstanding req */
1317 if (info->aio_result.result.aio_errno)
1319 if (info->myflags & MY_WME)
1320 my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1321 my_filename(info->file),
1322 info->aio_result.result.aio_errno);
1323 my_errno=info->aio_result.result.aio_errno;
1324 info->error= -1;
1325 return(1);
1327 if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1328 read_length == (size_t) -1)
1330 my_errno=0; /* For testing */
1331 info->error= (read_length == (size_t) -1 ? -1 :
1332 (int) (read_length+left_length));
1333 return(1);
1335 info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1337 if (info->request_pos != info->buffer)
1338 info->request_pos=info->buffer;
1339 else
1340 info->request_pos=info->buffer+info->read_length;
1341 info->read_pos=info->request_pos;
1342 next_pos_in_file=info->aio_read_pos+read_length;
1344 /* Check if pos_in_file is changed
1345 (_ni_read_cache may have skipped some bytes) */
1347 if (info->aio_read_pos < info->pos_in_file)
1348 { /* Fix if skipped bytes */
1349 if (info->aio_read_pos + read_length < info->pos_in_file)
1351 read_length=0; /* Skip block */
1352 next_pos_in_file=info->pos_in_file;
1354 else
1356 my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1357 info->pos_in_file=info->aio_read_pos; /* Whe are here */
1358 info->read_pos=info->request_pos+offset;
1359 read_length-=offset; /* Bytes left from read_pos */
1362 #ifndef DBUG_OFF
1363 if (info->aio_read_pos > info->pos_in_file)
1365 my_errno=EINVAL;
1366 return(info->read_length= (size_t) -1);
1368 #endif
1369 /* Copy found bytes to buffer */
1370 length=min(Count,read_length);
1371 memcpy(Buffer,info->read_pos,(size_t) length);
1372 Buffer+=length;
1373 Count-=length;
1374 left_length+=length;
1375 info->read_end=info->rc_pos+read_length;
1376 info->read_pos+=length;
1378 else
1379 next_pos_in_file=(info->pos_in_file+ (size_t)
1380 (info->read_end - info->request_pos));
1382 /* If reading large blocks, or first read or read with skip */
1383 if (Count)
1385 if (next_pos_in_file == info->end_of_file)
1387 info->error=(int) (read_length+left_length);
1388 return 1;
1391 if (my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))
1392 == MY_FILEPOS_ERROR)
1394 info->error= -1;
1395 return (1);
1398 read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1399 if (Count < read_length)
1400 { /* Small block, read to cache */
1401 if ((read_length=my_read(info->file,info->request_pos,
1402 read_length, info->myflags)) == (size_t) -1)
1403 return info->error= -1;
1404 use_length=min(Count,read_length);
1405 memcpy(Buffer,info->request_pos,(size_t) use_length);
1406 info->read_pos=info->request_pos+Count;
1407 info->read_end=info->request_pos+read_length;
1408 info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1409 next_pos_in_file+=read_length;
1411 if (Count != use_length)
1412 { /* Didn't find hole block */
1413 if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1414 my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
1415 my_filename(info->file),my_errno);
1416 info->error=(int) (read_length+left_length);
1417 return 1;
1420 else
1421 { /* Big block, don't cache it */
1422 if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
1423 != Count)
1425 info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1426 return 1;
1428 info->read_pos=info->read_end=info->request_pos;
1429 info->pos_in_file=(next_pos_in_file+=Count);
1433 /* Read next block with asyncronic io */
1434 diff_length=(next_pos_in_file & (IO_SIZE-1));
1435 max_length= info->read_length - diff_length;
1436 if (max_length > info->end_of_file - next_pos_in_file)
1437 max_length= (size_t) (info->end_of_file - next_pos_in_file);
1439 if (info->request_pos != info->buffer)
1440 read_buffer=info->buffer;
1441 else
1442 read_buffer=info->buffer+info->read_length;
1443 info->aio_read_pos=next_pos_in_file;
1444 if (max_length)
1446 info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1447 DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1448 (ulong) next_pos_in_file, (ulong) max_length));
1449 if (aioread(info->file,read_buffer, max_length,
1450 (my_off_t) next_pos_in_file,MY_SEEK_SET,
1451 &info->aio_result.result))
1452 { /* Skip async io */
1453 my_errno=errno;
1454 DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1455 errno, info->aio_result.result.aio_errno));
1456 if (info->request_pos != info->buffer)
1458 bmove(info->buffer,info->request_pos,
1459 (size_t) (info->read_end - info->read_pos));
1460 info->request_pos=info->buffer;
1461 info->read_pos-=info->read_length;
1462 info->read_end-=info->read_length;
1464 info->read_length=info->buffer_length; /* Use hole buffer */
1465 info->read_function=_my_b_read; /* Use normal IO_READ next */
1467 else
1468 info->inited=info->aio_result.pending=1;
1470 return 0; /* Block read, async in use */
1471 } /* _my_b_async_read */
1472 #endif
1475 /* Read one byte when buffer is empty */
1477 int _my_b_get(IO_CACHE *info)
1479 uchar buff;
1480 IO_CACHE_CALLBACK pre_read,post_read;
1481 if ((pre_read = info->pre_read))
1482 (*pre_read)(info);
1483 if ((*(info)->read_function)(info,&buff,1))
1484 return my_b_EOF;
1485 if ((post_read = info->post_read))
1486 (*post_read)(info);
1487 return (int) (uchar) buff;
1491 Write a byte buffer to IO_CACHE and flush to disk
1492 if IO_CACHE is full.
1494 RETURN VALUE
1495 1 On error on write
1496 0 On success
1497 -1 On error; my_errno contains error code.
1500 int _my_b_write(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1502 size_t rest_length,length;
1504 if (info->pos_in_file+info->buffer_length > info->end_of_file)
1506 my_errno=errno=EFBIG;
1507 return info->error = -1;
1510 rest_length= (size_t) (info->write_end - info->write_pos);
1511 memcpy(info->write_pos,Buffer,(size_t) rest_length);
1512 Buffer+=rest_length;
1513 Count-=rest_length;
1514 info->write_pos+=rest_length;
1516 if (my_b_flush_io_cache(info,1))
1517 return 1;
1518 if (Count >= IO_SIZE)
1519 { /* Fill first intern buffer */
1520 length=Count & (size_t) ~(IO_SIZE-1);
1521 if (info->seek_not_done)
1524 Whenever a function which operates on IO_CACHE flushes/writes
1525 some part of the IO_CACHE to disk it will set the property
1526 "seek_not_done" to indicate this to other functions operating
1527 on the IO_CACHE.
1529 if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)))
1531 info->error= -1;
1532 return (1);
1534 info->seek_not_done=0;
1536 if (my_write(info->file, Buffer, length, info->myflags | MY_NABP))
1537 return info->error= -1;
1539 #ifdef THREAD
1541 In case of a shared I/O cache with a writer we normally do direct
1542 write cache to read cache copy. Simulate this here by direct
1543 caller buffer to read cache copy. Do it after the write so that
1544 the cache readers actions on the flushed part can go in parallel
1545 with the write of the extra stuff. copy_to_read_buffer()
1546 synchronizes writer and readers so that after this call the
1547 readers can act on the extra stuff while the writer can go ahead
1548 and prepare the next output. copy_to_read_buffer() relies on
1549 info->pos_in_file.
1551 if (info->share)
1552 copy_to_read_buffer(info, Buffer, length);
1553 #endif
1555 Count-=length;
1556 Buffer+=length;
1557 info->pos_in_file+=length;
1559 memcpy(info->write_pos,Buffer,(size_t) Count);
1560 info->write_pos+=Count;
1561 return 0;
1566 Append a block to the write buffer.
1567 This is done with the buffer locked to ensure that we don't read from
1568 the write buffer before we are ready with it.
1571 int my_b_append(register IO_CACHE *info, const uchar *Buffer, size_t Count)
1573 size_t rest_length,length;
1575 #ifdef THREAD
1577 Assert that we cannot come here with a shared cache. If we do one
1578 day, we might need to add a call to copy_to_read_buffer().
1580 DBUG_ASSERT(!info->share);
1581 #endif
1583 lock_append_buffer(info);
1584 rest_length= (size_t) (info->write_end - info->write_pos);
1585 if (Count <= rest_length)
1586 goto end;
1587 memcpy(info->write_pos, Buffer, rest_length);
1588 Buffer+=rest_length;
1589 Count-=rest_length;
1590 info->write_pos+=rest_length;
1591 if (my_b_flush_io_cache(info,0))
1593 unlock_append_buffer(info);
1594 return 1;
1596 if (Count >= IO_SIZE)
1597 { /* Fill first intern buffer */
1598 length=Count & (size_t) ~(IO_SIZE-1);
1599 if (my_write(info->file,Buffer, length, info->myflags | MY_NABP))
1601 unlock_append_buffer(info);
1602 return info->error= -1;
1604 Count-=length;
1605 Buffer+=length;
1606 info->end_of_file+=length;
1609 end:
1610 memcpy(info->write_pos,Buffer,(size_t) Count);
1611 info->write_pos+=Count;
1612 unlock_append_buffer(info);
1613 return 0;
1617 int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1620 Sasha: We are not writing this with the ? operator to avoid hitting
1621 a possible compiler bug. At least gcc 2.95 cannot deal with
1622 several layers of ternary operators that evaluated comma(,) operator
1623 expressions inside - I do have a test case if somebody wants it
1625 if (info->type == SEQ_READ_APPEND)
1626 return my_b_append(info, Buffer, Count);
1627 return my_b_write(info, Buffer, Count);
1632 Write a block to disk where part of the data may be inside the record
1633 buffer. As all write calls to the data goes through the cache,
1634 we will never get a seek over the end of the buffer
1637 int my_block_write(register IO_CACHE *info, const uchar *Buffer, size_t Count,
1638 my_off_t pos)
1640 size_t length;
1641 int error=0;
1643 #ifdef THREAD
1645 Assert that we cannot come here with a shared cache. If we do one
1646 day, we might need to add a call to copy_to_read_buffer().
1648 DBUG_ASSERT(!info->share);
1649 #endif
1651 if (pos < info->pos_in_file)
1653 /* Of no overlap, write everything without buffering */
1654 if (pos + Count <= info->pos_in_file)
1655 return my_pwrite(info->file, Buffer, Count, pos,
1656 info->myflags | MY_NABP);
1657 /* Write the part of the block that is before buffer */
1658 length= (uint) (info->pos_in_file - pos);
1659 if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1660 info->error= error= -1;
1661 Buffer+=length;
1662 pos+= length;
1663 Count-= length;
1664 #ifndef HAVE_PREAD
1665 info->seek_not_done=1;
1666 #endif
1669 /* Check if we want to write inside the used part of the buffer.*/
1670 length= (size_t) (info->write_end - info->buffer);
1671 if (pos < info->pos_in_file + length)
1673 size_t offset= (size_t) (pos - info->pos_in_file);
1674 length-=offset;
1675 if (length > Count)
1676 length=Count;
1677 memcpy(info->buffer+offset, Buffer, length);
1678 Buffer+=length;
1679 Count-= length;
1680 /* Fix length of buffer if the new data was larger */
1681 if (info->buffer+length > info->write_pos)
1682 info->write_pos=info->buffer+length;
1683 if (!Count)
1684 return (error);
1686 /* Write at the end of the current buffer; This is the normal case */
1687 if (_my_b_write(info, Buffer, Count))
1688 error= -1;
1689 return error;
1693 /* Flush write cache */
1695 #ifdef THREAD
1696 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1697 lock_append_buffer(info);
1698 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1699 unlock_append_buffer(info);
1700 #else
1701 #define LOCK_APPEND_BUFFER
1702 #define UNLOCK_APPEND_BUFFER
1703 #endif
1706 int my_b_flush_io_cache(IO_CACHE *info,
1707 int need_append_buffer_lock __attribute__((unused)))
1709 size_t length;
1710 my_off_t pos_in_file;
1711 my_bool append_cache= (info->type == SEQ_READ_APPEND);
1712 DBUG_ENTER("my_b_flush_io_cache");
1713 DBUG_PRINT("enter", ("cache: 0x%lx", (long) info));
1715 #ifdef THREAD
1716 if (!append_cache)
1717 need_append_buffer_lock= 0;
1718 #endif
1720 if (info->type == WRITE_CACHE || append_cache)
1722 if (info->file == -1)
1724 if (real_open_cached_file(info))
1725 DBUG_RETURN((info->error= -1));
1727 LOCK_APPEND_BUFFER;
1729 if ((length=(size_t) (info->write_pos - info->write_buffer)))
1731 #ifdef THREAD
1733 In case of a shared I/O cache with a writer we do direct write
1734 cache to read cache copy. Do it before the write here so that
1735 the readers can work in parallel with the write.
1736 copy_to_read_buffer() relies on info->pos_in_file.
1738 if (info->share)
1739 copy_to_read_buffer(info, info->write_buffer, length);
1740 #endif
1742 pos_in_file=info->pos_in_file;
1744 If we have append cache, we always open the file with
1745 O_APPEND which moves the pos to EOF automatically on every write
1747 if (!append_cache && info->seek_not_done)
1748 { /* File touched, do seek */
1749 if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
1750 MY_FILEPOS_ERROR)
1752 UNLOCK_APPEND_BUFFER;
1753 DBUG_RETURN((info->error= -1));
1755 if (!append_cache)
1756 info->seek_not_done=0;
1758 if (!append_cache)
1759 info->pos_in_file+=length;
1760 info->write_end= (info->write_buffer+info->buffer_length-
1761 ((pos_in_file+length) & (IO_SIZE-1)));
1763 if (my_write(info->file,info->write_buffer,length,
1764 info->myflags | MY_NABP))
1765 info->error= -1;
1766 else
1767 info->error= 0;
1768 if (!append_cache)
1770 set_if_bigger(info->end_of_file,(pos_in_file+length));
1772 else
1774 info->end_of_file+=(info->write_pos-info->append_read_pos);
1775 DBUG_ASSERT(info->end_of_file == my_tell(info->file,MYF(0)));
1778 info->append_read_pos=info->write_pos=info->write_buffer;
1779 ++info->disk_writes;
1780 UNLOCK_APPEND_BUFFER;
1781 DBUG_RETURN(info->error);
1784 #ifdef HAVE_AIOWAIT
1785 else if (info->type != READ_NET)
1787 my_aiowait(&info->aio_result); /* Wait for outstanding req */
1788 info->inited=0;
1790 #endif
1791 UNLOCK_APPEND_BUFFER;
1792 DBUG_RETURN(0);
1796 Free an IO_CACHE object
1798 SYNOPSOS
1799 end_io_cache()
1800 info IO_CACHE Handle to free
1802 NOTES
1803 It's currently safe to call this if one has called init_io_cache()
1804 on the 'info' object, even if init_io_cache() failed.
1805 This function is also safe to call twice with the same handle.
1807 RETURN
1808 0 ok
1809 # Error
1812 int end_io_cache(IO_CACHE *info)
1814 int error=0;
1815 IO_CACHE_CALLBACK pre_close;
1816 DBUG_ENTER("end_io_cache");
1817 DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
1819 #ifdef THREAD
1821 Every thread must call remove_io_thread(). The last one destroys
1822 the share elements.
1824 DBUG_ASSERT(!info->share || !info->share->total_threads);
1825 #endif
1827 if ((pre_close=info->pre_close))
1829 (*pre_close)(info);
1830 info->pre_close= 0;
1832 if (info->alloced_buffer)
1834 info->alloced_buffer=0;
1835 if (info->file != -1) /* File doesn't exist */
1836 error= my_b_flush_io_cache(info,1);
1837 my_free((uchar*) info->buffer,MYF(MY_WME));
1838 info->buffer=info->read_pos=(uchar*) 0;
1840 if (info->type == SEQ_READ_APPEND)
1842 /* Destroy allocated mutex */
1843 info->type= TYPE_NOT_SET;
1844 #ifdef THREAD
1845 pthread_mutex_destroy(&info->append_buffer_lock);
1846 #endif
1848 DBUG_RETURN(error);
1849 } /* end_io_cache */
1852 /**********************************************************************
1853 Testing of MF_IOCACHE
1854 **********************************************************************/
1856 #ifdef MAIN
1858 #include <my_dir.h>
1860 void die(const char* fmt, ...)
1862 va_list va_args;
1863 va_start(va_args,fmt);
1864 fprintf(stderr,"Error:");
1865 vfprintf(stderr, fmt,va_args);
1866 fprintf(stderr,", errno=%d\n", errno);
1867 exit(1);
1870 int open_file(const char* fname, IO_CACHE* info, int cache_size)
1872 int fd;
1873 if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1874 die("Could not open %s", fname);
1875 if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1876 die("failed in init_io_cache()");
1877 return fd;
1880 void close_file(IO_CACHE* info)
1882 end_io_cache(info);
1883 my_close(info->file, MYF(MY_WME));
1886 int main(int argc, char** argv)
1888 IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1889 MY_STAT status;
1890 const char* fname="/tmp/iocache.test";
1891 int cache_size=16384;
1892 char llstr_buf[22];
1893 int max_block,total_bytes=0;
1894 int i,num_loops=100,error=0;
1895 char *p;
1896 char* block, *block_end;
1897 MY_INIT(argv[0]);
1898 max_block = cache_size*3;
1899 if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1900 die("Not enough memory to allocate test block");
1901 block_end = block + max_block;
1902 for (p = block,i=0; p < block_end;i++)
1904 *p++ = (char)i;
1906 if (my_stat(fname,&status, MYF(0)) &&
1907 my_delete(fname,MYF(MY_WME)))
1909 die("Delete of %s failed, aborting", fname);
1911 open_file(fname,&sra_cache, cache_size);
1912 for (i = 0; i < num_loops; i++)
1914 char buf[4];
1915 int block_size = abs(rand() % max_block);
1916 int4store(buf, block_size);
1917 if (my_b_append(&sra_cache,buf,4) ||
1918 my_b_append(&sra_cache, block, block_size))
1919 die("write failed");
1920 total_bytes += 4+block_size;
1922 close_file(&sra_cache);
1923 my_free(block,MYF(MY_WME));
1924 if (!my_stat(fname,&status,MYF(MY_WME)))
1925 die("%s failed to stat, but I had just closed it,\
1926 wonder how that happened");
1927 printf("Final size of %s is %s, wrote %d bytes\n",fname,
1928 llstr(status.st_size,llstr_buf),
1929 total_bytes);
1930 my_delete(fname, MYF(MY_WME));
1931 /* check correctness of tests */
1932 if (total_bytes != status.st_size)
1934 fprintf(stderr,"Not the same number of bytes acutally in file as bytes \
1935 supposedly written\n");
1936 error=1;
1938 exit(error);
1939 return 0;
1941 #endif