mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / kernel / blocks / ndbfs / AsyncFile.cpp
blob841ed49e36fb2135d4a6be888ffcaaaefd5ecb86
1 /* Copyright (c) 2003-2007 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 #include <ndb_global.h>
18 #include <my_sys.h>
19 #include <my_pthread.h>
21 #ifdef HAVE_XFS_XFS_H
22 #include <xfs/xfs.h>
23 #endif
25 #include "AsyncFile.hpp"
27 #include <ErrorHandlingMacros.hpp>
28 #include <kernel_types.h>
29 #include <ndbd_malloc.hpp>
30 #include <NdbThread.h>
31 #include <signaldata/FsRef.hpp>
32 #include <signaldata/FsOpenReq.hpp>
33 #include <signaldata/FsReadWriteReq.hpp>
35 // use this to test broken pread code
36 //#define HAVE_BROKEN_PREAD
38 #ifdef HAVE_BROKEN_PREAD
39 #undef HAVE_PWRITE
40 #undef HAVE_PREAD
41 #endif
43 #if defined NDB_WIN32
44 #else
45 // For readv and writev
46 #include <sys/uio.h>
47 #endif
49 #ifndef NDB_WIN32
50 #include <dirent.h>
51 #endif
53 // Use this define if you want printouts from AsyncFile class
54 //#define DEBUG_ASYNCFILE
56 #ifdef DEBUG_ASYNCFILE
57 #include <NdbOut.hpp>
58 #define DEBUG(x) x
59 #define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
60 void printErrorAndFlags(Uint32 used_flags);
61 #else
62 #define DEBUG(x)
63 #define PRINT_ERRORANDFLAGS(f)
64 #endif
66 // Define the size of the write buffer (for each thread)
67 #define WRITEBUFFERSIZE 262144
69 const char *actionName[] = {
70 "open",
71 "close",
72 "closeRemove",
73 "read",
74 "readv",
75 "write",
76 "writev",
77 "writeSync",
78 "writevSync",
79 "sync",
80 "end" };
82 static int numAsyncFiles = 0;
84 extern "C" void * runAsyncFile(void* arg)
86 ((AsyncFile*)arg)->run();
87 return (NULL);
90 AsyncFile::AsyncFile(SimulatedBlock& fs) :
91 theFileName(),
92 #ifdef NDB_WIN32
93 hFile(INVALID_HANDLE_VALUE),
94 #else
95 theFd(-1),
96 #endif
97 theReportTo(0),
98 theMemoryChannelPtr(NULL),
99 m_fs(fs)
101 m_page_ptr.setNull();
102 m_current_request= m_last_request= 0;
103 m_open_flags = 0;
106 void
107 AsyncFile::doStart()
109 // Stacksize for filesystem threads
110 #if !defined(DBUG_OFF) && defined (__hpux)
111 // Empirical evidence indicates at least 32k
112 const NDB_THREAD_STACKSIZE stackSize = 32768;
113 #else
114 // Otherwise an 8k stack should be enough
115 const NDB_THREAD_STACKSIZE stackSize = 8192;
116 #endif
118 char buf[16];
119 numAsyncFiles++;
120 BaseString::snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
122 theStartMutexPtr = NdbMutex_Create();
123 theStartConditionPtr = NdbCondition_Create();
124 NdbMutex_Lock(theStartMutexPtr);
125 theStartFlag = false;
126 theThreadPtr = NdbThread_Create(runAsyncFile,
127 (void**)this,
128 stackSize,
129 (char*)&buf,
130 NDB_THREAD_PRIO_MEAN);
131 if (theThreadPtr == 0)
132 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, "","Could not allocate file system thread");
134 NdbCondition_Wait(theStartConditionPtr,
135 theStartMutexPtr);
136 NdbMutex_Unlock(theStartMutexPtr);
137 NdbMutex_Destroy(theStartMutexPtr);
138 NdbCondition_Destroy(theStartConditionPtr);
141 AsyncFile::~AsyncFile()
143 void *status;
144 Request request;
145 request.action = Request::end;
146 theMemoryChannelPtr->writeChannel( &request );
147 NdbThread_WaitFor(theThreadPtr, &status);
148 NdbThread_Destroy(&theThreadPtr);
149 delete theMemoryChannelPtr;
152 void
153 AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
155 theReportTo = reportTo;
158 void AsyncFile::execute(Request* request)
160 theMemoryChannelPtr->writeChannel( request );
163 void
164 AsyncFile::run()
166 Request *request;
167 // Create theMemoryChannel in the thread that will wait for it
168 NdbMutex_Lock(theStartMutexPtr);
169 theMemoryChannelPtr = new MemoryChannel<Request>();
170 theStartFlag = true;
171 // Create write buffer for bigger writes
172 theWriteBufferSize = WRITEBUFFERSIZE;
173 theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
174 NDB_O_DIRECT_WRITE_ALIGNMENT-1);
175 theWriteBuffer = (char *)
176 (((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
177 ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
179 NdbMutex_Unlock(theStartMutexPtr);
180 NdbCondition_Signal(theStartConditionPtr);
182 if (!theWriteBuffer) {
183 DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
184 return;
185 }//if
187 while (1) {
188 request = theMemoryChannelPtr->readChannel();
189 if (!request) {
190 DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
191 endReq();
192 return;
193 }//if
194 m_current_request= request;
195 switch (request->action) {
196 case Request:: open:
197 openReq(request);
198 break;
199 case Request:: close:
200 closeReq(request);
201 break;
202 case Request:: closeRemove:
203 closeReq(request);
204 removeReq(request);
205 break;
206 case Request:: readPartial:
207 case Request:: read:
208 readReq(request);
209 break;
210 case Request:: readv:
211 readvReq(request);
212 break;
213 case Request:: write:
214 writeReq(request);
215 break;
216 case Request:: writev:
217 writevReq(request);
218 break;
219 case Request:: writeSync:
220 writeReq(request);
221 syncReq(request);
222 break;
223 case Request:: writevSync:
224 writevReq(request);
225 syncReq(request);
226 break;
227 case Request:: sync:
228 syncReq(request);
229 break;
230 case Request:: append:
231 appendReq(request);
232 break;
233 case Request:: append_synch:
234 appendReq(request);
235 syncReq(request);
236 break;
237 case Request::rmrf:
238 rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
239 break;
240 case Request:: end:
241 if (theFd > 0)
242 closeReq(request);
243 endReq();
244 return;
245 default:
246 DEBUG(ndbout_c("Invalid Request"));
247 abort();
248 break;
249 }//switch
250 m_last_request= request;
251 m_current_request= 0;
253 // No need to signal as ndbfs only uses tryRead
254 theReportTo->writeChannelNoSignal(request);
255 }//while
256 }//AsyncFile::run()
258 #ifdef O_DIRECT
259 static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
260 #endif
263 AsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode)
265 assert(new_flags & (O_CREAT | O_TRUNC));
266 #ifdef O_DIRECT
267 int ret;
268 char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
269 while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
270 (errno == EINTR));
271 if (ret == -1)
273 new_flags &= ~O_DIRECT;
274 ndbout_c("%s Failed to write using O_DIRECT, disabling",
275 theFileName.c_str());
278 close(theFd);
279 theFd = ::open(theFileName.c_str(), new_flags, mode);
280 if (theFd == -1)
281 return errno;
282 #endif
284 return 0;
288 AsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode)
290 #ifdef O_DIRECT
291 int ret;
292 char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
293 while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
294 (errno == EINTR));
295 if (ret == -1)
297 ndbout_c("%s Failed to read using O_DIRECT, disabling",
298 theFileName.c_str());
299 goto reopen;
302 if(lseek(theFd, 0, SEEK_SET) != 0)
304 return errno;
307 if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
309 struct stat buf;
310 if ((fstat(theFd, &buf) == -1))
312 return errno;
314 else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0)
316 ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
317 theFileName.c_str(), GLOBAL_PAGE_SIZE);
318 goto reopen;
322 return 0;
324 reopen:
325 close(theFd);
326 new_flags &= ~O_DIRECT;
327 theFd = ::open(theFileName.c_str(), new_flags, mode);
328 if (theFd == -1)
329 return errno;
330 #endif
331 return 0;
334 void AsyncFile::openReq(Request* request)
336 m_auto_sync_freq = 0;
337 m_write_wo_sync = 0;
338 m_open_flags = request->par.open.flags;
340 // for open.flags, see signal FSOPENREQ
341 #ifdef NDB_WIN32
342 DWORD dwCreationDisposition;
343 DWORD dwDesiredAccess = 0;
344 DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
345 DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING;
346 Uint32 flags = request->par.open.flags;
348 // Convert file open flags from Solaris to Windows
349 if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){
350 dwCreationDisposition = CREATE_ALWAYS;
351 } else if (flags & FsOpenReq::OM_TRUNCATE){
352 dwCreationDisposition = TRUNCATE_EXISTING;
353 } else if (flags & FsOpenReq::OM_CREATE){
354 dwCreationDisposition = CREATE_NEW;
355 } else {
356 dwCreationDisposition = OPEN_EXISTING;
359 switch(flags & 3){
360 case FsOpenReq::OM_READONLY:
361 dwDesiredAccess = GENERIC_READ;
362 break;
363 case FsOpenReq::OM_WRITEONLY:
364 dwDesiredAccess = GENERIC_WRITE;
365 break;
366 case FsOpenReq::OM_READWRITE:
367 dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
368 break;
369 default:
370 request->error = 1000;
371 break;
372 return;
375 hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
376 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
378 if(INVALID_HANDLE_VALUE == hFile) {
379 request->error = GetLastError();
380 if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error))
381 && (flags & FsOpenReq::OM_CREATE)) {
382 createDirectories();
383 hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
384 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
386 if(INVALID_HANDLE_VALUE == hFile)
387 request->error = GetLastError();
388 else
389 request->error = 0;
391 return;
394 else {
395 request->error = 0;
396 return;
398 #else
399 Uint32 flags = request->par.open.flags;
400 int new_flags = 0;
402 // Convert file open flags from Solaris to Liux
403 if (flags & FsOpenReq::OM_CREATE)
405 new_flags |= O_CREAT;
408 if (flags & FsOpenReq::OM_TRUNCATE){
409 #if 0
410 if(Global_unlinkO_CREAT){
411 unlink(theFileName.c_str());
412 } else
413 #endif
414 new_flags |= O_TRUNC;
417 if (flags & FsOpenReq::OM_AUTOSYNC)
419 m_auto_sync_freq = request->par.open.auto_sync_size;
422 if (flags & FsOpenReq::OM_APPEND){
423 new_flags |= O_APPEND;
426 if (flags & FsOpenReq::OM_DIRECT)
427 #ifdef O_DIRECT
429 new_flags |= O_DIRECT;
431 #endif
433 if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
435 #ifdef O_SYNC
436 new_flags |= O_SYNC;
437 #endif
440 const char * rw = "";
441 switch(flags & 0x3){
442 case FsOpenReq::OM_READONLY:
443 rw = "r";
444 new_flags |= O_RDONLY;
445 break;
446 case FsOpenReq::OM_WRITEONLY:
447 rw = "w";
448 new_flags |= O_WRONLY;
449 break;
450 case FsOpenReq::OM_READWRITE:
451 rw = "rw";
452 new_flags |= O_RDWR;
453 break;
454 default:
455 request->error = 1000;
456 break;
457 return;
460 // allow for user to choose any permissionsa with umask
461 const int mode = S_IRUSR | S_IWUSR |
462 S_IRGRP | S_IWGRP |
463 S_IROTH | S_IWOTH;
464 if (flags & FsOpenReq::OM_CREATE_IF_NONE)
466 Uint32 tmp_flags = new_flags;
467 #ifdef O_DIRECT
468 tmp_flags &= ~O_DIRECT;
469 #endif
470 if ((theFd = ::open(theFileName.c_str(), tmp_flags, mode)) != -1)
472 close(theFd);
473 request->error = FsRef::fsErrFileExists;
474 return;
476 new_flags |= O_CREAT;
479 no_odirect:
480 if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
482 PRINT_ERRORANDFLAGS(new_flags);
483 if ((errno == ENOENT) && (new_flags & O_CREAT))
485 createDirectories();
486 if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
488 #ifdef O_DIRECT
489 if (new_flags & O_DIRECT)
491 new_flags &= ~O_DIRECT;
492 goto no_odirect;
494 #endif
495 PRINT_ERRORANDFLAGS(new_flags);
496 request->error = errno;
497 return;
500 #ifdef O_DIRECT
501 else if (new_flags & O_DIRECT)
503 new_flags &= ~O_DIRECT;
504 goto no_odirect;
506 #endif
507 else
509 request->error = errno;
510 return;
514 if (flags & FsOpenReq::OM_CHECK_SIZE)
516 struct stat buf;
517 if ((fstat(theFd, &buf) == -1))
519 request->error = errno;
521 else if((Uint64)buf.st_size != request->par.open.file_size)
523 request->error = FsRef::fsErrInvalidFileSize;
525 if (request->error)
526 return;
529 if (flags & FsOpenReq::OM_INIT)
531 off_t off = 0;
532 const off_t sz = request->par.open.file_size;
533 Uint32 tmp[sizeof(SignalHeader)+25];
534 Signal * signal = (Signal*)(&tmp[0]);
535 FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend();
537 Uint32 index = 0;
538 Uint32 block = refToBlock(request->theUserReference);
540 #ifdef HAVE_XFS_XFS_H
541 if(platform_test_xfs_fd(theFd))
543 ndbout_c("Using xfsctl(XFS_IOC_RESVSP64) to allocate disk space");
544 xfs_flock64_t fl;
545 fl.l_whence= 0;
546 fl.l_start= 0;
547 fl.l_len= (off64_t)sz;
548 if(xfsctl(NULL, theFd, XFS_IOC_RESVSP64, &fl) < 0)
549 ndbout_c("failed to optimally allocate disk space");
551 #endif
552 #ifdef HAVE_POSIX_FALLOCATE
553 posix_fallocate(theFd, 0, sz);
554 #endif
556 while(off < sz)
558 req->filePointer = 0; // DATA 0
559 req->userPointer = request->theUserPointer; // DATA 2
560 req->numberOfPages = 1; // DATA 5
561 req->varIndex = index++;
562 req->data.pageData[0] = m_page_ptr.i;
564 m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
565 FsReadWriteReq::FixedLength + 1);
566 retry:
567 Uint32 size = request->par.open.page_size;
568 char* buf = (char*)m_page_ptr.p;
569 while(size > 0){
570 const int n = write(theFd, buf, size);
571 if(n == -1 && errno == EINTR)
573 continue;
575 if(n == -1 || n == 0)
577 break;
579 size -= n;
580 buf += n;
582 if(size != 0)
584 int err = errno;
585 #ifdef O_DIRECT
586 if ((new_flags & O_DIRECT) && off == 0)
588 ndbout_c("error on first write(%d), disable O_DIRECT", err);
589 new_flags &= ~O_DIRECT;
590 close(theFd);
591 theFd = ::open(theFileName.c_str(), new_flags, mode);
592 if (theFd != -1)
593 goto retry;
595 #endif
596 close(theFd);
597 unlink(theFileName.c_str());
598 request->error = err;
599 return;
601 off += request->par.open.page_size;
603 if(lseek(theFd, 0, SEEK_SET) != 0)
604 request->error = errno;
606 else if (flags & FsOpenReq::OM_DIRECT)
608 #ifdef O_DIRECT
609 if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
611 request->error = check_odirect_write(flags, new_flags, mode);
613 else
615 request->error = check_odirect_read(flags, new_flags, mode);
618 if (request->error)
619 return;
620 #endif
622 #ifdef VM_TRACE
623 if (flags & FsOpenReq::OM_DIRECT)
625 #ifdef O_DIRECT
626 ndbout_c("%s %s O_DIRECT: %d",
627 theFileName.c_str(), rw,
628 !!(new_flags & O_DIRECT));
629 #else
630 ndbout_c("%s %s O_DIRECT: 0",
631 theFileName.c_str(), rw);
632 #endif
634 #endif
635 if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
637 #ifdef O_SYNC
639 * reopen file with O_SYNC
641 close(theFd);
642 new_flags &= ~(O_CREAT | O_TRUNC);
643 new_flags |= O_SYNC;
644 theFd = ::open(theFileName.c_str(), new_flags, mode);
645 if (theFd == -1)
647 request->error = errno;
649 #endif
651 #endif
655 AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
656 int return_value;
657 req->par.readWrite.pages[0].size = 0;
658 #ifdef NDB_WIN32
659 DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
660 if(dwSFP != offset) {
661 return GetLastError();
663 #elif ! defined(HAVE_PREAD)
664 off_t seek_val;
665 while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
666 && errno == EINTR);
667 if(seek_val == (off_t)-1)
669 return errno;
671 #endif
673 while (size > 0) {
674 size_t bytes_read = 0;
676 #ifdef NDB_WIN32
677 DWORD dwBytesRead;
678 BOOL bRead = ReadFile(hFile,
679 buf,
680 size,
681 &dwBytesRead,
683 if(!bRead){
684 return GetLastError();
686 bytes_read = dwBytesRead;
687 #elif ! defined(HAVE_PREAD)
688 return_value = ::read(theFd, buf, size);
689 #else // UNIX
690 return_value = ::pread(theFd, buf, size, offset);
691 #endif
692 #ifndef NDB_WIN32
693 if (return_value == -1 && errno == EINTR) {
694 DEBUG(ndbout_c("EINTR in read"));
695 continue;
696 } else if (return_value == -1){
697 return errno;
698 } else {
699 bytes_read = return_value;
701 #endif
703 req->par.readWrite.pages[0].size += bytes_read;
704 if(bytes_read == 0){
705 if(req->action == Request::readPartial)
707 return 0;
709 DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
710 size, offset, buf, bytes_read, return_value));
711 return ERR_ReadUnderflow;
714 if(bytes_read != size){
715 DEBUG(ndbout_c("Warning partial read %d != %d",
716 bytes_read, size));
719 buf += bytes_read;
720 size -= bytes_read;
721 offset += bytes_read;
723 return 0;
726 void
727 AsyncFile::readReq( Request * request)
729 for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
730 off_t offset = request->par.readWrite.pages[i].offset;
731 size_t size = request->par.readWrite.pages[i].size;
732 char * buf = request->par.readWrite.pages[i].buf;
734 int err = readBuffer(request, buf, size, offset);
735 if(err != 0){
736 request->error = err;
737 return;
742 void
743 AsyncFile::readvReq( Request * request)
745 #if ! defined(HAVE_PREAD)
746 readReq(request);
747 return;
748 #elif defined NDB_WIN32
749 // ReadFileScatter?
750 readReq(request);
751 return;
752 #else
753 int return_value;
754 int length = 0;
755 struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep
756 for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
757 iov[i].iov_base= request->par.readWrite.pages[i].buf;
758 iov[i].iov_len= request->par.readWrite.pages[i].size;
759 length = length + iov[i].iov_len;
761 lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
762 return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
763 if (return_value == -1) {
764 request->error = errno;
765 return;
766 } else if (return_value != length) {
767 request->error = 1011;
768 return;
770 #endif
773 int
774 AsyncFile::extendfile(Request* request) {
775 #if ! defined(HAVE_PWRITE)
776 // Find max size of this file in this request
777 int maxOffset = 0;
778 int maxSize = 0;
779 for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
780 if (request->par.readWrite.pages[i].offset > maxOffset) {
781 maxOffset = request->par.readWrite.pages[i].offset;
782 maxSize = request->par.readWrite.pages[i].size;
785 DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize));
787 // Allocate a buffer and fill it with zeros
788 void* pbuf = ndbd_malloc(maxSize);
789 memset(pbuf, 0, maxSize);
790 for (int p = 0; p <= maxOffset; p = p + maxSize) {
791 int return_value;
792 return_value = lseek(theFd,
794 SEEK_SET);
795 if((return_value == -1 ) || (return_value != p)) {
796 ndbd_free(pbuf,maxSize);
797 return -1;
799 return_value = ::write(theFd,
800 pbuf,
801 maxSize);
802 if ((return_value == -1) || (return_value != maxSize)) {
803 ndbd_free(pbuf,maxSize);
804 return -1;
807 ndbd_free(pbuf,maxSize);
809 DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str()));
810 return 0;
811 #else
812 request = request;
813 DEBUG(ndbout_c("no pwrite"));
814 abort();
815 return -1;
816 #endif
819 void
820 AsyncFile::writeReq( Request * request)
822 int page_num = 0;
823 bool write_not_complete = true;
825 while(write_not_complete) {
826 int totsize = 0;
827 off_t offset = request->par.readWrite.pages[page_num].offset;
828 char* bufptr = theWriteBuffer;
830 write_not_complete = false;
831 if (request->par.readWrite.numberOfPages > 1) {
832 off_t page_offset = offset;
834 // Multiple page write, copy to buffer for one write
835 for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
836 memcpy(bufptr,
837 request->par.readWrite.pages[i].buf,
838 request->par.readWrite.pages[i].size);
839 bufptr += request->par.readWrite.pages[i].size;
840 totsize += request->par.readWrite.pages[i].size;
841 if (((i + 1) < request->par.readWrite.numberOfPages)) {
842 // There are more pages to write
843 // Check that offsets are consequtive
844 off_t tmp = page_offset + request->par.readWrite.pages[i].size;
845 if (tmp != request->par.readWrite.pages[i+1].offset) {
846 // Next page is not aligned with previous, not allowed
847 DEBUG(ndbout_c("Page offsets are not aligned"));
848 request->error = EINVAL;
849 return;
851 if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
852 // We are not finished and the buffer is full
853 write_not_complete = true;
854 // Start again with next page
855 page_num = i + 1;
856 break;
859 page_offset += request->par.readWrite.pages[i].size;
861 bufptr = theWriteBuffer;
862 } else {
863 // One page write, write page directly
864 bufptr = request->par.readWrite.pages[0].buf;
865 totsize = request->par.readWrite.pages[0].size;
867 int err = writeBuffer(bufptr, totsize, offset);
868 if(err != 0){
869 request->error = err;
870 return;
872 } // while(write_not_complete)
874 if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
875 syncReq(request);
880 AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
881 size_t chunk_size)
883 size_t bytes_to_write = chunk_size;
884 int return_value;
886 m_write_wo_sync += size;
888 #ifdef NDB_WIN32
889 DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
890 if(dwSFP != offset) {
891 return GetLastError();
893 #elif ! defined(HAVE_PWRITE)
894 off_t seek_val;
895 while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
896 && errno == EINTR);
897 if(seek_val == (off_t)-1)
899 return errno;
901 #endif
903 while (size > 0) {
904 if (size < bytes_to_write){
905 // We are at the last chunk
906 bytes_to_write = size;
908 size_t bytes_written = 0;
910 #ifdef NDB_WIN32
911 DWORD dwWritten;
912 BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
913 if(!bWrite) {
914 return GetLastError();
916 bytes_written = dwWritten;
917 if (bytes_written != bytes_to_write) {
918 DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write));
921 #elif ! defined(HAVE_PWRITE)
922 return_value = ::write(theFd, buf, bytes_to_write);
923 #else // UNIX
924 return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
925 #endif
926 #ifndef NDB_WIN32
927 if (return_value == -1 && errno == EINTR) {
928 bytes_written = 0;
929 DEBUG(ndbout_c("EINTR in write"));
930 } else if (return_value == -1){
931 return errno;
932 } else {
933 bytes_written = return_value;
935 if(bytes_written == 0){
936 DEBUG(ndbout_c("no bytes written"));
937 abort();
940 if(bytes_written != bytes_to_write){
941 DEBUG(ndbout_c("Warning partial write %d != %d",
942 bytes_written, bytes_to_write));
945 #endif
947 buf += bytes_written;
948 size -= bytes_written;
949 offset += bytes_written;
951 return 0;
954 void
955 AsyncFile::writevReq( Request * request)
957 // WriteFileGather on WIN32?
958 writeReq(request);
962 void
963 AsyncFile::closeReq(Request * request)
965 if (m_open_flags & (
966 FsOpenReq::OM_WRITEONLY |
967 FsOpenReq::OM_READWRITE |
968 FsOpenReq::OM_APPEND )) {
969 syncReq(request);
971 #ifdef NDB_WIN32
972 if(!CloseHandle(hFile)) {
973 request->error = GetLastError();
975 hFile = INVALID_HANDLE_VALUE;
976 #else
977 if (-1 == ::close(theFd)) {
978 #ifndef DBUG_OFF
979 if (theFd == -1) {
980 DEBUG(ndbout_c("close on fd = -1"));
981 abort();
983 #endif
984 request->error = errno;
986 theFd = -1;
987 #endif
990 bool AsyncFile::isOpen(){
991 #ifdef NDB_WIN32
992 return (hFile != INVALID_HANDLE_VALUE);
993 #else
994 return (theFd != -1);
995 #endif
999 void
1000 AsyncFile::syncReq(Request * request)
1002 if(m_auto_sync_freq && m_write_wo_sync == 0){
1003 return;
1005 #ifdef NDB_WIN32
1006 if(!FlushFileBuffers(hFile)) {
1007 request->error = GetLastError();
1008 return;
1010 #else
1011 if (-1 == ::fsync(theFd)){
1012 request->error = errno;
1013 return;
1015 #endif
1016 m_write_wo_sync = 0;
1019 void
1020 AsyncFile::appendReq(Request * request){
1022 const char * buf = request->par.append.buf;
1023 Uint32 size = request->par.append.size;
1025 m_write_wo_sync += size;
1027 #ifdef NDB_WIN32
1028 DWORD dwWritten = 0;
1029 while(size > 0){
1030 if(!WriteFile(hFile, buf, size, &dwWritten, 0)){
1031 request->error = GetLastError();
1032 return ;
1035 buf += dwWritten;
1036 size -= dwWritten;
1038 #else
1039 while(size > 0){
1040 const int n = write(theFd, buf, size);
1041 if(n == -1 && errno == EINTR){
1042 continue;
1044 if(n == -1){
1045 request->error = errno;
1046 return;
1048 if(n == 0){
1049 DEBUG(ndbout_c("append with n=0"));
1050 abort();
1052 size -= n;
1053 buf += n;
1055 #endif
1057 if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
1058 syncReq(request);
1062 void
1063 AsyncFile::removeReq(Request * request)
1065 #ifdef NDB_WIN32
1066 if(!DeleteFile(theFileName.c_str())) {
1067 request->error = GetLastError();
1069 #else
1070 if (-1 == ::remove(theFileName.c_str())) {
1071 request->error = errno;
1074 #endif
1077 void
1078 AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
1079 Uint32 path_len = strlen(path);
1080 Uint32 path_max_copy = PATH_MAX - path_len;
1081 char* path_add = &path[path_len];
1082 #ifndef NDB_WIN32
1083 if(!request->par.rmrf.directory){
1084 // Remove file
1085 if(unlink((const char *)path) != 0 && errno != ENOENT)
1086 request->error = errno;
1087 return;
1089 // Remove directory
1090 DIR* dirp = opendir((const char *)path);
1091 if(dirp == 0){
1092 if(errno != ENOENT)
1093 request->error = errno;
1094 return;
1096 struct dirent * dp;
1097 while ((dp = readdir(dirp)) != NULL){
1098 if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) {
1099 BaseString::snprintf(path_add, (size_t)path_max_copy, "%s%s",
1100 DIR_SEPARATOR, dp->d_name);
1101 if(remove((const char*)path) == 0){
1102 path[path_len] = 0;
1103 continue;
1106 rmrfReq(request, path, true);
1107 path[path_len] = 0;
1108 if(request->error != 0){
1109 closedir(dirp);
1110 return;
1114 closedir(dirp);
1115 if(removePath && rmdir((const char *)path) != 0){
1116 request->error = errno;
1118 return;
1119 #else
1121 if(!request->par.rmrf.directory){
1122 // Remove file
1123 if(!DeleteFile(path)){
1124 DWORD dwError = GetLastError();
1125 if(dwError!=ERROR_FILE_NOT_FOUND)
1126 request->error = dwError;
1128 return;
1131 strcat(path, "\\*");
1132 WIN32_FIND_DATA ffd;
1133 HANDLE hFindFile = FindFirstFile(path, &ffd);
1134 path[path_len] = 0;
1135 if(INVALID_HANDLE_VALUE==hFindFile){
1136 DWORD dwError = GetLastError();
1137 if(dwError!=ERROR_PATH_NOT_FOUND)
1138 request->error = dwError;
1139 return;
1142 do {
1143 if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
1144 strcat(path, "\\");
1145 strcat(path, ffd.cFileName);
1146 if(DeleteFile(path)) {
1147 path[path_len] = 0;
1148 continue;
1149 }//if
1151 rmrfReq(request, path, true);
1152 path[path_len] = 0;
1153 if(request->error != 0){
1154 FindClose(hFindFile);
1155 return;
1158 } while(FindNextFile(hFindFile, &ffd));
1160 FindClose(hFindFile);
1162 if(removePath && !RemoveDirectory(path))
1163 request->error = GetLastError();
1165 #endif
1168 void AsyncFile::endReq()
1170 // Thread is ended with return
1171 if (theWriteBufferUnaligned)
1172 ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
1176 void AsyncFile::createDirectories()
1178 char* tmp;
1179 const char * name = theFileName.c_str();
1180 const char * base = theFileName.get_base_name();
1181 while((tmp = (char *)strstr(base, DIR_SEPARATOR)))
1183 char t = tmp[0];
1184 tmp[0] = 0;
1185 #ifdef NDB_WIN32
1186 CreateDirectory(name, 0);
1187 #else
1188 mkdir(name, S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
1189 #endif
1190 tmp[0] = t;
1191 base = tmp + sizeof(DIR_SEPARATOR);
1195 #ifdef DEBUG_ASYNCFILE
1196 void printErrorAndFlags(Uint32 used_flags) {
1197 char buf[255];
1198 sprintf(buf, "PEAF: errno=%d \"", errno);
1200 switch(errno) {
1201 case EACCES:
1202 strcat(buf, "EACCES");
1203 break;
1204 case EDQUOT:
1205 strcat(buf, "EDQUOT");
1206 break;
1207 case EEXIST :
1208 strcat(buf, "EEXIST");
1209 break;
1210 case EINTR :
1211 strcat(buf, "EINTR");
1212 break;
1213 case EFAULT :
1214 strcat(buf, "EFAULT");
1215 break;
1216 case EIO :
1217 strcat(buf, "EIO");
1218 break;
1219 case EISDIR :
1220 strcat(buf, "EISDIR");
1221 break;
1222 case ELOOP :
1223 strcat(buf, "ELOOP");
1224 break;
1225 case EMFILE :
1226 strcat(buf, "EMFILE");
1227 break;
1228 case ENFILE :
1229 strcat(buf, "ENFILE");
1230 break;
1231 case ENOENT :
1232 strcat(buf, "ENOENT ");
1233 break;
1234 case ENOSPC :
1235 strcat(buf, "ENOSPC");
1236 break;
1237 case ENOTDIR :
1238 strcat(buf, "ENOTDIR");
1239 break;
1240 case ENXIO :
1241 strcat(buf, "ENXIO");
1242 break;
1243 case EOPNOTSUPP:
1244 strcat(buf, "EOPNOTSUPP");
1245 break;
1246 case EMULTIHOP :
1247 strcat(buf, "EMULTIHOP");
1248 break;
1249 case ENOLINK :
1250 strcat(buf, "ENOLINK");
1251 break;
1252 case ENOSR :
1253 strcat(buf, "ENOSR");
1254 break;
1255 case EOVERFLOW :
1256 strcat(buf, "EOVERFLOW");
1257 break;
1258 case EROFS :
1259 strcat(buf, "EROFS");
1260 break;
1261 case EAGAIN :
1262 strcat(buf, "EAGAIN");
1263 break;
1264 case EINVAL :
1265 strcat(buf, "EINVAL");
1266 break;
1267 case ENOMEM :
1268 strcat(buf, "ENOMEM");
1269 break;
1270 case ETXTBSY :
1271 strcat(buf, "ETXTBSY");
1272 break;
1273 case ENAMETOOLONG:
1274 strcat(buf, "ENAMETOOLONG");
1275 break;
1276 case EBADF:
1277 strcat(buf, "EBADF");
1278 break;
1279 case ESPIPE:
1280 strcat(buf, "ESPIPE");
1281 break;
1282 case ESTALE:
1283 strcat(buf, "ESTALE");
1284 break;
1285 default:
1286 strcat(buf, "EOTHER");
1287 break;
1289 strcat(buf, "\" ");
1290 strcat(buf, " flags: ");
1291 switch(used_flags & 3){
1292 case O_RDONLY:
1293 strcat(buf, "O_RDONLY, ");
1294 break;
1295 case O_WRONLY:
1296 strcat(buf, "O_WRONLY, ");
1297 break;
1298 case O_RDWR:
1299 strcat(buf, "O_RDWR, ");
1300 break;
1301 default:
1302 strcat(buf, "Unknown!!, ");
1305 if((used_flags & O_APPEND)==O_APPEND)
1306 strcat(buf, "O_APPEND, ");
1307 if((used_flags & O_CREAT)==O_CREAT)
1308 strcat(buf, "O_CREAT, ");
1309 if((used_flags & O_EXCL)==O_EXCL)
1310 strcat(buf, "O_EXCL, ");
1311 if((used_flags & O_NOCTTY) == O_NOCTTY)
1312 strcat(buf, "O_NOCTTY, ");
1313 if((used_flags & O_NONBLOCK)==O_NONBLOCK)
1314 strcat(buf, "O_NONBLOCK, ");
1315 if((used_flags & O_TRUNC)==O_TRUNC)
1316 strcat(buf, "O_TRUNC, ");
1317 if((used_flags & O_DSYNC)==O_DSYNC)
1318 strcat(buf, "O_DSYNC, ");
1319 if((used_flags & O_NDELAY)==O_NDELAY)
1320 strcat(buf, "O_NDELAY, ");
1321 if((used_flags & O_RSYNC)==O_RSYNC)
1322 strcat(buf, "O_RSYNC, ");
1323 #ifdef O_SYNC
1324 if((used_flags & O_SYNC)==O_SYNC)
1325 strcat(buf, "O_SYNC, ");
1326 #endif
1327 DEBUG(ndbout_c(buf));
1330 #endif
1332 NdbOut&
1333 operator<<(NdbOut& out, const Request& req)
1335 out << "[ Request: file: " << hex << req.file
1336 << " userRef: " << hex << req.theUserReference
1337 << " userData: " << dec << req.theUserPointer
1338 << " theFilePointer: " << req.theFilePointer
1339 << " action: ";
1340 switch(req.action){
1341 case Request::open:
1342 out << "open";
1343 break;
1344 case Request::close:
1345 out << "close";
1346 break;
1347 case Request::closeRemove:
1348 out << "closeRemove";
1349 break;
1350 case Request::read: // Allways leave readv directly after
1351 out << "read";
1352 break;
1353 case Request::readv:
1354 out << "readv";
1355 break;
1356 case Request::write:// Allways leave writev directly after
1357 out << "write";
1358 break;
1359 case Request::writev:
1360 out << "writev";
1361 break;
1362 case Request::writeSync:// Allways leave writevSync directly after
1363 out << "writeSync";
1364 break;
1365 // writeSync because SimblockAsyncFileSystem depends on it
1366 case Request::writevSync:
1367 out << "writevSync";
1368 break;
1369 case Request::sync:
1370 out << "sync";
1371 break;
1372 case Request::end:
1373 out << "end";
1374 break;
1375 case Request::append:
1376 out << "append";
1377 break;
1378 case Request::rmrf:
1379 out << "rmrf";
1380 break;
1381 default:
1382 out << (Uint32)req.action;
1383 break;
1385 out << " ]";
1386 return out;