1 /* Copyright (c) 2003-2007 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 #include <ndb_global.h>
19 #include <my_pthread.h>
25 #include "AsyncFile.hpp"
27 #include <ErrorHandlingMacros.hpp>
28 #include <kernel_types.h>
29 #include <ndbd_malloc.hpp>
30 #include <NdbThread.h>
31 #include <signaldata/FsRef.hpp>
32 #include <signaldata/FsOpenReq.hpp>
33 #include <signaldata/FsReadWriteReq.hpp>
35 // use this to test broken pread code
36 //#define HAVE_BROKEN_PREAD
38 #ifdef HAVE_BROKEN_PREAD
45 // For readv and writev
53 // Use this define if you want printouts from AsyncFile class
54 //#define DEBUG_ASYNCFILE
56 #ifdef DEBUG_ASYNCFILE
59 #define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
60 void printErrorAndFlags(Uint32 used_flags
);
63 #define PRINT_ERRORANDFLAGS(f)
66 // Define the size of the write buffer (for each thread)
67 #define WRITEBUFFERSIZE 262144
69 const char *actionName
[] = {
82 static int numAsyncFiles
= 0;
84 extern "C" void * runAsyncFile(void* arg
)
86 ((AsyncFile
*)arg
)->run();
90 AsyncFile::AsyncFile(SimulatedBlock
& fs
) :
93 hFile(INVALID_HANDLE_VALUE
),
98 theMemoryChannelPtr(NULL
),
101 m_page_ptr
.setNull();
102 m_current_request
= m_last_request
= 0;
109 // Stacksize for filesystem threads
110 #if !defined(DBUG_OFF) && defined (__hpux)
111 // Empirical evidence indicates at least 32k
112 const NDB_THREAD_STACKSIZE stackSize
= 32768;
114 // Otherwise an 8k stack should be enough
115 const NDB_THREAD_STACKSIZE stackSize
= 8192;
120 BaseString::snprintf(buf
, sizeof(buf
), "AsyncFile%d", numAsyncFiles
);
122 theStartMutexPtr
= NdbMutex_Create();
123 theStartConditionPtr
= NdbCondition_Create();
124 NdbMutex_Lock(theStartMutexPtr
);
125 theStartFlag
= false;
126 theThreadPtr
= NdbThread_Create(runAsyncFile
,
130 NDB_THREAD_PRIO_MEAN
);
131 if (theThreadPtr
== 0)
132 ERROR_SET(fatal
, NDBD_EXIT_MEMALLOC
, "","Could not allocate file system thread");
134 NdbCondition_Wait(theStartConditionPtr
,
136 NdbMutex_Unlock(theStartMutexPtr
);
137 NdbMutex_Destroy(theStartMutexPtr
);
138 NdbCondition_Destroy(theStartConditionPtr
);
141 AsyncFile::~AsyncFile()
145 request
.action
= Request::end
;
146 theMemoryChannelPtr
->writeChannel( &request
);
147 NdbThread_WaitFor(theThreadPtr
, &status
);
148 NdbThread_Destroy(&theThreadPtr
);
149 delete theMemoryChannelPtr
;
153 AsyncFile::reportTo( MemoryChannel
<Request
> *reportTo
)
155 theReportTo
= reportTo
;
158 void AsyncFile::execute(Request
* request
)
160 theMemoryChannelPtr
->writeChannel( request
);
167 // Create theMemoryChannel in the thread that will wait for it
168 NdbMutex_Lock(theStartMutexPtr
);
169 theMemoryChannelPtr
= new MemoryChannel
<Request
>();
171 // Create write buffer for bigger writes
172 theWriteBufferSize
= WRITEBUFFERSIZE
;
173 theWriteBufferUnaligned
= (char *) ndbd_malloc(theWriteBufferSize
+
174 NDB_O_DIRECT_WRITE_ALIGNMENT
-1);
175 theWriteBuffer
= (char *)
176 (((UintPtr
)theWriteBufferUnaligned
+ NDB_O_DIRECT_WRITE_ALIGNMENT
- 1) &
177 ~(UintPtr
)(NDB_O_DIRECT_WRITE_ALIGNMENT
- 1));
179 NdbMutex_Unlock(theStartMutexPtr
);
180 NdbCondition_Signal(theStartConditionPtr
);
182 if (!theWriteBuffer
) {
183 DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
188 request
= theMemoryChannelPtr
->readChannel();
190 DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
194 m_current_request
= request
;
195 switch (request
->action
) {
199 case Request:: close
:
202 case Request:: closeRemove
:
206 case Request:: readPartial
:
210 case Request:: readv
:
213 case Request:: write
:
216 case Request:: writev
:
219 case Request:: writeSync
:
223 case Request:: writevSync
:
230 case Request:: append
:
233 case Request:: append_synch
:
238 rmrfReq(request
, (char*)theFileName
.c_str(), request
->par
.rmrf
.own_directory
);
246 DEBUG(ndbout_c("Invalid Request"));
250 m_last_request
= request
;
251 m_current_request
= 0;
253 // No need to signal as ndbfs only uses tryRead
254 theReportTo
->writeChannelNoSignal(request
);
259 static char g_odirect_readbuf
[2*GLOBAL_PAGE_SIZE
-1];
263 AsyncFile::check_odirect_write(Uint32 flags
, int& new_flags
, int mode
)
265 assert(new_flags
& (O_CREAT
| O_TRUNC
));
268 char * bufptr
= (char*)((UintPtr(g_odirect_readbuf
)+(GLOBAL_PAGE_SIZE
- 1)) & ~(GLOBAL_PAGE_SIZE
- 1));
269 while (((ret
= ::write(theFd
, bufptr
, GLOBAL_PAGE_SIZE
)) == -1) &&
273 new_flags
&= ~O_DIRECT
;
274 ndbout_c("%s Failed to write using O_DIRECT, disabling",
275 theFileName
.c_str());
279 theFd
= ::open(theFileName
.c_str(), new_flags
, mode
);
288 AsyncFile::check_odirect_read(Uint32 flags
, int &new_flags
, int mode
)
292 char * bufptr
= (char*)((UintPtr(g_odirect_readbuf
)+(GLOBAL_PAGE_SIZE
- 1)) & ~(GLOBAL_PAGE_SIZE
- 1));
293 while (((ret
= ::read(theFd
, bufptr
, GLOBAL_PAGE_SIZE
)) == -1) &&
297 ndbout_c("%s Failed to read using O_DIRECT, disabling",
298 theFileName
.c_str());
302 if(lseek(theFd
, 0, SEEK_SET
) != 0)
307 if ((flags
& FsOpenReq::OM_CHECK_SIZE
) == 0)
310 if ((fstat(theFd
, &buf
) == -1))
314 else if ((buf
.st_size
% GLOBAL_PAGE_SIZE
) != 0)
316 ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
317 theFileName
.c_str(), GLOBAL_PAGE_SIZE
);
326 new_flags
&= ~O_DIRECT
;
327 theFd
= ::open(theFileName
.c_str(), new_flags
, mode
);
334 void AsyncFile::openReq(Request
* request
)
336 m_auto_sync_freq
= 0;
338 m_open_flags
= request
->par
.open
.flags
;
340 // for open.flags, see signal FSOPENREQ
342 DWORD dwCreationDisposition
;
343 DWORD dwDesiredAccess
= 0;
344 DWORD dwShareMode
= FILE_SHARE_READ
| FILE_SHARE_WRITE
;
345 DWORD dwFlagsAndAttributes
= FILE_ATTRIBUTE_NORMAL
| FILE_FLAG_RANDOM_ACCESS
| FILE_FLAG_NO_BUFFERING
;
346 Uint32 flags
= request
->par
.open
.flags
;
348 // Convert file open flags from Solaris to Windows
349 if ((flags
& FsOpenReq::OM_CREATE
) && (flags
& FsOpenReq::OM_TRUNCATE
)){
350 dwCreationDisposition
= CREATE_ALWAYS
;
351 } else if (flags
& FsOpenReq::OM_TRUNCATE
){
352 dwCreationDisposition
= TRUNCATE_EXISTING
;
353 } else if (flags
& FsOpenReq::OM_CREATE
){
354 dwCreationDisposition
= CREATE_NEW
;
356 dwCreationDisposition
= OPEN_EXISTING
;
360 case FsOpenReq::OM_READONLY
:
361 dwDesiredAccess
= GENERIC_READ
;
363 case FsOpenReq::OM_WRITEONLY
:
364 dwDesiredAccess
= GENERIC_WRITE
;
366 case FsOpenReq::OM_READWRITE
:
367 dwDesiredAccess
= GENERIC_READ
| GENERIC_WRITE
;
370 request
->error
= 1000;
375 hFile
= CreateFile(theFileName
.c_str(), dwDesiredAccess
, dwShareMode
,
376 0, dwCreationDisposition
, dwFlagsAndAttributes
, 0);
378 if(INVALID_HANDLE_VALUE
== hFile
) {
379 request
->error
= GetLastError();
380 if(((ERROR_PATH_NOT_FOUND
== request
->error
) || (ERROR_INVALID_NAME
== request
->error
))
381 && (flags
& FsOpenReq::OM_CREATE
)) {
383 hFile
= CreateFile(theFileName
.c_str(), dwDesiredAccess
, dwShareMode
,
384 0, dwCreationDisposition
, dwFlagsAndAttributes
, 0);
386 if(INVALID_HANDLE_VALUE
== hFile
)
387 request
->error
= GetLastError();
399 Uint32 flags
= request
->par
.open
.flags
;
402 // Convert file open flags from Solaris to Liux
403 if (flags
& FsOpenReq::OM_CREATE
)
405 new_flags
|= O_CREAT
;
408 if (flags
& FsOpenReq::OM_TRUNCATE
){
410 if(Global_unlinkO_CREAT
){
411 unlink(theFileName
.c_str());
414 new_flags
|= O_TRUNC
;
417 if (flags
& FsOpenReq::OM_AUTOSYNC
)
419 m_auto_sync_freq
= request
->par
.open
.auto_sync_size
;
422 if (flags
& FsOpenReq::OM_APPEND
){
423 new_flags
|= O_APPEND
;
426 if (flags
& FsOpenReq::OM_DIRECT
)
429 new_flags
|= O_DIRECT
;
433 if ((flags
& FsOpenReq::OM_SYNC
) && ! (flags
& FsOpenReq::OM_INIT
))
440 const char * rw
= "";
442 case FsOpenReq::OM_READONLY
:
444 new_flags
|= O_RDONLY
;
446 case FsOpenReq::OM_WRITEONLY
:
448 new_flags
|= O_WRONLY
;
450 case FsOpenReq::OM_READWRITE
:
455 request
->error
= 1000;
460 // allow for user to choose any permissionsa with umask
461 const int mode
= S_IRUSR
| S_IWUSR
|
464 if (flags
& FsOpenReq::OM_CREATE_IF_NONE
)
466 Uint32 tmp_flags
= new_flags
;
468 tmp_flags
&= ~O_DIRECT
;
470 if ((theFd
= ::open(theFileName
.c_str(), tmp_flags
, mode
)) != -1)
473 request
->error
= FsRef::fsErrFileExists
;
476 new_flags
|= O_CREAT
;
480 if (-1 == (theFd
= ::open(theFileName
.c_str(), new_flags
, mode
)))
482 PRINT_ERRORANDFLAGS(new_flags
);
483 if ((errno
== ENOENT
) && (new_flags
& O_CREAT
))
486 if (-1 == (theFd
= ::open(theFileName
.c_str(), new_flags
, mode
)))
489 if (new_flags
& O_DIRECT
)
491 new_flags
&= ~O_DIRECT
;
495 PRINT_ERRORANDFLAGS(new_flags
);
496 request
->error
= errno
;
501 else if (new_flags
& O_DIRECT
)
503 new_flags
&= ~O_DIRECT
;
509 request
->error
= errno
;
514 if (flags
& FsOpenReq::OM_CHECK_SIZE
)
517 if ((fstat(theFd
, &buf
) == -1))
519 request
->error
= errno
;
521 else if((Uint64
)buf
.st_size
!= request
->par
.open
.file_size
)
523 request
->error
= FsRef::fsErrInvalidFileSize
;
529 if (flags
& FsOpenReq::OM_INIT
)
532 const off_t sz
= request
->par
.open
.file_size
;
533 Uint32 tmp
[sizeof(SignalHeader
)+25];
534 Signal
* signal
= (Signal
*)(&tmp
[0]);
535 FsReadWriteReq
* req
= (FsReadWriteReq
*)signal
->getDataPtrSend();
538 Uint32 block
= refToBlock(request
->theUserReference
);
540 #ifdef HAVE_XFS_XFS_H
541 if(platform_test_xfs_fd(theFd
))
543 ndbout_c("Using xfsctl(XFS_IOC_RESVSP64) to allocate disk space");
547 fl
.l_len
= (off64_t
)sz
;
548 if(xfsctl(NULL
, theFd
, XFS_IOC_RESVSP64
, &fl
) < 0)
549 ndbout_c("failed to optimally allocate disk space");
552 #ifdef HAVE_POSIX_FALLOCATE
553 posix_fallocate(theFd
, 0, sz
);
558 req
->filePointer
= 0; // DATA 0
559 req
->userPointer
= request
->theUserPointer
; // DATA 2
560 req
->numberOfPages
= 1; // DATA 5
561 req
->varIndex
= index
++;
562 req
->data
.pageData
[0] = m_page_ptr
.i
;
564 m_fs
.EXECUTE_DIRECT(block
, GSN_FSWRITEREQ
, signal
,
565 FsReadWriteReq::FixedLength
+ 1);
567 Uint32 size
= request
->par
.open
.page_size
;
568 char* buf
= (char*)m_page_ptr
.p
;
570 const int n
= write(theFd
, buf
, size
);
571 if(n
== -1 && errno
== EINTR
)
575 if(n
== -1 || n
== 0)
586 if ((new_flags
& O_DIRECT
) && off
== 0)
588 ndbout_c("error on first write(%d), disable O_DIRECT", err
);
589 new_flags
&= ~O_DIRECT
;
591 theFd
= ::open(theFileName
.c_str(), new_flags
, mode
);
597 unlink(theFileName
.c_str());
598 request
->error
= err
;
601 off
+= request
->par
.open
.page_size
;
603 if(lseek(theFd
, 0, SEEK_SET
) != 0)
604 request
->error
= errno
;
606 else if (flags
& FsOpenReq::OM_DIRECT
)
609 if (flags
& (FsOpenReq::OM_TRUNCATE
| FsOpenReq::OM_CREATE
))
611 request
->error
= check_odirect_write(flags
, new_flags
, mode
);
615 request
->error
= check_odirect_read(flags
, new_flags
, mode
);
623 if (flags
& FsOpenReq::OM_DIRECT
)
626 ndbout_c("%s %s O_DIRECT: %d",
627 theFileName
.c_str(), rw
,
628 !!(new_flags
& O_DIRECT
));
630 ndbout_c("%s %s O_DIRECT: 0",
631 theFileName
.c_str(), rw
);
635 if ((flags
& FsOpenReq::OM_SYNC
) && (flags
& FsOpenReq::OM_INIT
))
639 * reopen file with O_SYNC
642 new_flags
&= ~(O_CREAT
| O_TRUNC
);
644 theFd
= ::open(theFileName
.c_str(), new_flags
, mode
);
647 request
->error
= errno
;
655 AsyncFile::readBuffer(Request
* req
, char * buf
, size_t size
, off_t offset
){
657 req
->par
.readWrite
.pages
[0].size
= 0;
659 DWORD dwSFP
= SetFilePointer(hFile
, offset
, 0, FILE_BEGIN
);
660 if(dwSFP
!= offset
) {
661 return GetLastError();
663 #elif ! defined(HAVE_PREAD)
665 while((seek_val
= lseek(theFd
, offset
, SEEK_SET
)) == (off_t
)-1
667 if(seek_val
== (off_t
)-1)
674 size_t bytes_read
= 0;
678 BOOL bRead
= ReadFile(hFile
,
684 return GetLastError();
686 bytes_read
= dwBytesRead
;
687 #elif ! defined(HAVE_PREAD)
688 return_value
= ::read(theFd
, buf
, size
);
690 return_value
= ::pread(theFd
, buf
, size
, offset
);
693 if (return_value
== -1 && errno
== EINTR
) {
694 DEBUG(ndbout_c("EINTR in read"));
696 } else if (return_value
== -1){
699 bytes_read
= return_value
;
703 req
->par
.readWrite
.pages
[0].size
+= bytes_read
;
705 if(req
->action
== Request::readPartial
)
709 DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
710 size
, offset
, buf
, bytes_read
, return_value
));
711 return ERR_ReadUnderflow
;
714 if(bytes_read
!= size
){
715 DEBUG(ndbout_c("Warning partial read %d != %d",
721 offset
+= bytes_read
;
727 AsyncFile::readReq( Request
* request
)
729 for(int i
= 0; i
< request
->par
.readWrite
.numberOfPages
; i
++) {
730 off_t offset
= request
->par
.readWrite
.pages
[i
].offset
;
731 size_t size
= request
->par
.readWrite
.pages
[i
].size
;
732 char * buf
= request
->par
.readWrite
.pages
[i
].buf
;
734 int err
= readBuffer(request
, buf
, size
, offset
);
736 request
->error
= err
;
743 AsyncFile::readvReq( Request
* request
)
745 #if ! defined(HAVE_PREAD)
748 #elif defined NDB_WIN32
755 struct iovec iov
[20]; // the parameter in the signal restricts this to 20 deep
756 for(int i
=0; i
< request
->par
.readWrite
.numberOfPages
; i
++) {
757 iov
[i
].iov_base
= request
->par
.readWrite
.pages
[i
].buf
;
758 iov
[i
].iov_len
= request
->par
.readWrite
.pages
[i
].size
;
759 length
= length
+ iov
[i
].iov_len
;
761 lseek( theFd
, request
->par
.readWrite
.pages
[0].offset
, SEEK_SET
);
762 return_value
= ::readv(theFd
, iov
, request
->par
.readWrite
.numberOfPages
);
763 if (return_value
== -1) {
764 request
->error
= errno
;
766 } else if (return_value
!= length
) {
767 request
->error
= 1011;
774 AsyncFile::extendfile(Request
* request
) {
775 #if ! defined(HAVE_PWRITE)
776 // Find max size of this file in this request
779 for(int i
=0; i
< request
->par
.readWrite
.numberOfPages
; i
++) {
780 if (request
->par
.readWrite
.pages
[i
].offset
> maxOffset
) {
781 maxOffset
= request
->par
.readWrite
.pages
[i
].offset
;
782 maxSize
= request
->par
.readWrite
.pages
[i
].size
;
785 DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset
, maxSize
));
787 // Allocate a buffer and fill it with zeros
788 void* pbuf
= ndbd_malloc(maxSize
);
789 memset(pbuf
, 0, maxSize
);
790 for (int p
= 0; p
<= maxOffset
; p
= p
+ maxSize
) {
792 return_value
= lseek(theFd
,
795 if((return_value
== -1 ) || (return_value
!= p
)) {
796 ndbd_free(pbuf
,maxSize
);
799 return_value
= ::write(theFd
,
802 if ((return_value
== -1) || (return_value
!= maxSize
)) {
803 ndbd_free(pbuf
,maxSize
);
807 ndbd_free(pbuf
,maxSize
);
809 DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName
.c_str()));
813 DEBUG(ndbout_c("no pwrite"));
820 AsyncFile::writeReq( Request
* request
)
823 bool write_not_complete
= true;
825 while(write_not_complete
) {
827 off_t offset
= request
->par
.readWrite
.pages
[page_num
].offset
;
828 char* bufptr
= theWriteBuffer
;
830 write_not_complete
= false;
831 if (request
->par
.readWrite
.numberOfPages
> 1) {
832 off_t page_offset
= offset
;
834 // Multiple page write, copy to buffer for one write
835 for(int i
=page_num
; i
< request
->par
.readWrite
.numberOfPages
; i
++) {
837 request
->par
.readWrite
.pages
[i
].buf
,
838 request
->par
.readWrite
.pages
[i
].size
);
839 bufptr
+= request
->par
.readWrite
.pages
[i
].size
;
840 totsize
+= request
->par
.readWrite
.pages
[i
].size
;
841 if (((i
+ 1) < request
->par
.readWrite
.numberOfPages
)) {
842 // There are more pages to write
843 // Check that offsets are consequtive
844 off_t tmp
= page_offset
+ request
->par
.readWrite
.pages
[i
].size
;
845 if (tmp
!= request
->par
.readWrite
.pages
[i
+1].offset
) {
846 // Next page is not aligned with previous, not allowed
847 DEBUG(ndbout_c("Page offsets are not aligned"));
848 request
->error
= EINVAL
;
851 if ((unsigned)(totsize
+ request
->par
.readWrite
.pages
[i
+1].size
) > (unsigned)theWriteBufferSize
) {
852 // We are not finished and the buffer is full
853 write_not_complete
= true;
854 // Start again with next page
859 page_offset
+= request
->par
.readWrite
.pages
[i
].size
;
861 bufptr
= theWriteBuffer
;
863 // One page write, write page directly
864 bufptr
= request
->par
.readWrite
.pages
[0].buf
;
865 totsize
= request
->par
.readWrite
.pages
[0].size
;
867 int err
= writeBuffer(bufptr
, totsize
, offset
);
869 request
->error
= err
;
872 } // while(write_not_complete)
874 if(m_auto_sync_freq
&& m_write_wo_sync
> m_auto_sync_freq
){
880 AsyncFile::writeBuffer(const char * buf
, size_t size
, off_t offset
,
883 size_t bytes_to_write
= chunk_size
;
886 m_write_wo_sync
+= size
;
889 DWORD dwSFP
= SetFilePointer(hFile
, offset
, 0, FILE_BEGIN
);
890 if(dwSFP
!= offset
) {
891 return GetLastError();
893 #elif ! defined(HAVE_PWRITE)
895 while((seek_val
= lseek(theFd
, offset
, SEEK_SET
)) == (off_t
)-1
897 if(seek_val
== (off_t
)-1)
904 if (size
< bytes_to_write
){
905 // We are at the last chunk
906 bytes_to_write
= size
;
908 size_t bytes_written
= 0;
912 BOOL bWrite
= WriteFile(hFile
, buf
, bytes_to_write
, &dwWritten
, 0);
914 return GetLastError();
916 bytes_written
= dwWritten
;
917 if (bytes_written
!= bytes_to_write
) {
918 DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written
, bytes_to_write
));
921 #elif ! defined(HAVE_PWRITE)
922 return_value
= ::write(theFd
, buf
, bytes_to_write
);
924 return_value
= ::pwrite(theFd
, buf
, bytes_to_write
, offset
);
927 if (return_value
== -1 && errno
== EINTR
) {
929 DEBUG(ndbout_c("EINTR in write"));
930 } else if (return_value
== -1){
933 bytes_written
= return_value
;
935 if(bytes_written
== 0){
936 DEBUG(ndbout_c("no bytes written"));
940 if(bytes_written
!= bytes_to_write
){
941 DEBUG(ndbout_c("Warning partial write %d != %d",
942 bytes_written
, bytes_to_write
));
947 buf
+= bytes_written
;
948 size
-= bytes_written
;
949 offset
+= bytes_written
;
955 AsyncFile::writevReq( Request
* request
)
957 // WriteFileGather on WIN32?
963 AsyncFile::closeReq(Request
* request
)
966 FsOpenReq::OM_WRITEONLY
|
967 FsOpenReq::OM_READWRITE
|
968 FsOpenReq::OM_APPEND
)) {
972 if(!CloseHandle(hFile
)) {
973 request
->error
= GetLastError();
975 hFile
= INVALID_HANDLE_VALUE
;
977 if (-1 == ::close(theFd
)) {
980 DEBUG(ndbout_c("close on fd = -1"));
984 request
->error
= errno
;
990 bool AsyncFile::isOpen(){
992 return (hFile
!= INVALID_HANDLE_VALUE
);
994 return (theFd
!= -1);
1000 AsyncFile::syncReq(Request
* request
)
1002 if(m_auto_sync_freq
&& m_write_wo_sync
== 0){
1006 if(!FlushFileBuffers(hFile
)) {
1007 request
->error
= GetLastError();
1011 if (-1 == ::fsync(theFd
)){
1012 request
->error
= errno
;
1016 m_write_wo_sync
= 0;
1020 AsyncFile::appendReq(Request
* request
){
1022 const char * buf
= request
->par
.append
.buf
;
1023 Uint32 size
= request
->par
.append
.size
;
1025 m_write_wo_sync
+= size
;
1028 DWORD dwWritten
= 0;
1030 if(!WriteFile(hFile
, buf
, size
, &dwWritten
, 0)){
1031 request
->error
= GetLastError();
1040 const int n
= write(theFd
, buf
, size
);
1041 if(n
== -1 && errno
== EINTR
){
1045 request
->error
= errno
;
1049 DEBUG(ndbout_c("append with n=0"));
1057 if(m_auto_sync_freq
&& m_write_wo_sync
> m_auto_sync_freq
){
1063 AsyncFile::removeReq(Request
* request
)
1066 if(!DeleteFile(theFileName
.c_str())) {
1067 request
->error
= GetLastError();
1070 if (-1 == ::remove(theFileName
.c_str())) {
1071 request
->error
= errno
;
1078 AsyncFile::rmrfReq(Request
* request
, char * path
, bool removePath
){
1079 Uint32 path_len
= strlen(path
);
1080 Uint32 path_max_copy
= PATH_MAX
- path_len
;
1081 char* path_add
= &path
[path_len
];
1083 if(!request
->par
.rmrf
.directory
){
1085 if(unlink((const char *)path
) != 0 && errno
!= ENOENT
)
1086 request
->error
= errno
;
1090 DIR* dirp
= opendir((const char *)path
);
1093 request
->error
= errno
;
1097 while ((dp
= readdir(dirp
)) != NULL
){
1098 if ((strcmp(".", dp
->d_name
) != 0) && (strcmp("..", dp
->d_name
) != 0)) {
1099 BaseString::snprintf(path_add
, (size_t)path_max_copy
, "%s%s",
1100 DIR_SEPARATOR
, dp
->d_name
);
1101 if(remove((const char*)path
) == 0){
1106 rmrfReq(request
, path
, true);
1108 if(request
->error
!= 0){
1115 if(removePath
&& rmdir((const char *)path
) != 0){
1116 request
->error
= errno
;
1121 if(!request
->par
.rmrf
.directory
){
1123 if(!DeleteFile(path
)){
1124 DWORD dwError
= GetLastError();
1125 if(dwError
!=ERROR_FILE_NOT_FOUND
)
1126 request
->error
= dwError
;
1131 strcat(path
, "\\*");
1132 WIN32_FIND_DATA ffd
;
1133 HANDLE hFindFile
= FindFirstFile(path
, &ffd
);
1135 if(INVALID_HANDLE_VALUE
==hFindFile
){
1136 DWORD dwError
= GetLastError();
1137 if(dwError
!=ERROR_PATH_NOT_FOUND
)
1138 request
->error
= dwError
;
1143 if(0!=strcmp(".", ffd
.cFileName
) && 0!=strcmp("..", ffd
.cFileName
)){
1145 strcat(path
, ffd
.cFileName
);
1146 if(DeleteFile(path
)) {
1151 rmrfReq(request
, path
, true);
1153 if(request
->error
!= 0){
1154 FindClose(hFindFile
);
1158 } while(FindNextFile(hFindFile
, &ffd
));
1160 FindClose(hFindFile
);
1162 if(removePath
&& !RemoveDirectory(path
))
1163 request
->error
= GetLastError();
1168 void AsyncFile::endReq()
1170 // Thread is ended with return
1171 if (theWriteBufferUnaligned
)
1172 ndbd_free(theWriteBufferUnaligned
, theWriteBufferSize
);
1176 void AsyncFile::createDirectories()
1179 const char * name
= theFileName
.c_str();
1180 const char * base
= theFileName
.get_base_name();
1181 while((tmp
= (char *)strstr(base
, DIR_SEPARATOR
)))
1186 CreateDirectory(name
, 0);
1188 mkdir(name
, S_IRUSR
| S_IWUSR
| S_IXUSR
| S_IXGRP
| S_IRGRP
);
1191 base
= tmp
+ sizeof(DIR_SEPARATOR
);
1195 #ifdef DEBUG_ASYNCFILE
1196 void printErrorAndFlags(Uint32 used_flags
) {
1198 sprintf(buf
, "PEAF: errno=%d \"", errno
);
1202 strcat(buf
, "EACCES");
1205 strcat(buf
, "EDQUOT");
1208 strcat(buf
, "EEXIST");
1211 strcat(buf
, "EINTR");
1214 strcat(buf
, "EFAULT");
1220 strcat(buf
, "EISDIR");
1223 strcat(buf
, "ELOOP");
1226 strcat(buf
, "EMFILE");
1229 strcat(buf
, "ENFILE");
1232 strcat(buf
, "ENOENT ");
1235 strcat(buf
, "ENOSPC");
1238 strcat(buf
, "ENOTDIR");
1241 strcat(buf
, "ENXIO");
1244 strcat(buf
, "EOPNOTSUPP");
1247 strcat(buf
, "EMULTIHOP");
1250 strcat(buf
, "ENOLINK");
1253 strcat(buf
, "ENOSR");
1256 strcat(buf
, "EOVERFLOW");
1259 strcat(buf
, "EROFS");
1262 strcat(buf
, "EAGAIN");
1265 strcat(buf
, "EINVAL");
1268 strcat(buf
, "ENOMEM");
1271 strcat(buf
, "ETXTBSY");
1274 strcat(buf
, "ENAMETOOLONG");
1277 strcat(buf
, "EBADF");
1280 strcat(buf
, "ESPIPE");
1283 strcat(buf
, "ESTALE");
1286 strcat(buf
, "EOTHER");
1290 strcat(buf
, " flags: ");
1291 switch(used_flags
& 3){
1293 strcat(buf
, "O_RDONLY, ");
1296 strcat(buf
, "O_WRONLY, ");
1299 strcat(buf
, "O_RDWR, ");
1302 strcat(buf
, "Unknown!!, ");
1305 if((used_flags
& O_APPEND
)==O_APPEND
)
1306 strcat(buf
, "O_APPEND, ");
1307 if((used_flags
& O_CREAT
)==O_CREAT
)
1308 strcat(buf
, "O_CREAT, ");
1309 if((used_flags
& O_EXCL
)==O_EXCL
)
1310 strcat(buf
, "O_EXCL, ");
1311 if((used_flags
& O_NOCTTY
) == O_NOCTTY
)
1312 strcat(buf
, "O_NOCTTY, ");
1313 if((used_flags
& O_NONBLOCK
)==O_NONBLOCK
)
1314 strcat(buf
, "O_NONBLOCK, ");
1315 if((used_flags
& O_TRUNC
)==O_TRUNC
)
1316 strcat(buf
, "O_TRUNC, ");
1317 if((used_flags
& O_DSYNC
)==O_DSYNC
)
1318 strcat(buf
, "O_DSYNC, ");
1319 if((used_flags
& O_NDELAY
)==O_NDELAY
)
1320 strcat(buf
, "O_NDELAY, ");
1321 if((used_flags
& O_RSYNC
)==O_RSYNC
)
1322 strcat(buf
, "O_RSYNC, ");
1324 if((used_flags
& O_SYNC
)==O_SYNC
)
1325 strcat(buf
, "O_SYNC, ");
1327 DEBUG(ndbout_c(buf
));
1333 operator<<(NdbOut
& out
, const Request
& req
)
1335 out
<< "[ Request: file: " << hex
<< req
.file
1336 << " userRef: " << hex
<< req
.theUserReference
1337 << " userData: " << dec
<< req
.theUserPointer
1338 << " theFilePointer: " << req
.theFilePointer
1344 case Request::close
:
1347 case Request::closeRemove
:
1348 out
<< "closeRemove";
1350 case Request::read
: // Allways leave readv directly after
1353 case Request::readv
:
1356 case Request::write
:// Allways leave writev directly after
1359 case Request::writev
:
1362 case Request::writeSync
:// Allways leave writevSync directly after
1365 // writeSync because SimblockAsyncFileSystem depends on it
1366 case Request::writevSync
:
1367 out
<< "writevSync";
1375 case Request::append
:
1382 out
<< (Uint32
)req
.action
;