1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 #if !defined(LEVELDB_PLATFORM_WINDOWS)
14 #include <sys/resource.h>
17 #include <sys/types.h>
23 #include "leveldb/env.h"
24 #include "leveldb/slice.h"
25 #include "port/port.h"
26 #include "util/logging.h"
27 #include "util/mutexlock.h"
28 #include "util/posix_logger.h"
29 #include "util/env_posix_test_helper.h"
35 static int open_read_only_file_limit
= -1;
36 static int mmap_limit
= -1;
38 static Status
IOError(const std::string
& context
, int err_number
) {
39 return Status::IOError(context
, strerror(err_number
));
42 // Helper class to limit resource usage to avoid exhaustion.
43 // Currently used to limit read-only file descriptors and mmap file usage
44 // so that we do not end up running out of file descriptors, virtual memory,
45 // or running into kernel performance problems for very large databases.
48 // Limit maximum number of resources to |n|.
53 // If another resource is available, acquire it and return true.
56 if (GetAllowed() <= 0) {
60 intptr_t x
= GetAllowed();
69 // Release a resource acquired by a previous call to Acquire() that returned
73 SetAllowed(GetAllowed() + 1);
78 port::AtomicPointer allowed_
;
80 intptr_t GetAllowed() const {
81 return reinterpret_cast<intptr_t>(allowed_
.Acquire_Load());
84 // REQUIRES: mu_ must be held
85 void SetAllowed(intptr_t v
) {
86 allowed_
.Release_Store(reinterpret_cast<void*>(v
));
89 Limiter(const Limiter
&);
90 void operator=(const Limiter
&);
93 class PosixSequentialFile
: public SequentialFile
{
95 std::string filename_
;
99 PosixSequentialFile(const std::string
& fname
, FILE* f
)
100 : filename_(fname
), file_(f
) { }
101 virtual ~PosixSequentialFile() { fclose(file_
); }
103 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) {
105 size_t r
= fread_unlocked(scratch
, 1, n
, file_
);
106 *result
= Slice(scratch
, r
);
109 // We leave status as ok if we hit the end of the file
111 // A partial read with an error: return a non-ok status
112 s
= IOError(filename_
, errno
);
118 virtual Status
Skip(uint64_t n
) {
119 if (fseek(file_
, n
, SEEK_CUR
)) {
120 return IOError(filename_
, errno
);
126 // pread() based random-access
127 class PosixRandomAccessFile
: public RandomAccessFile
{
129 std::string filename_
;
130 bool temporary_fd_
; // If true, fd_ is -1 and we open on every read.
135 PosixRandomAccessFile(const std::string
& fname
, int fd
, Limiter
* limiter
)
136 : filename_(fname
), fd_(fd
), limiter_(limiter
) {
137 temporary_fd_
= !limiter
->Acquire();
139 // Open file on every access.
145 virtual ~PosixRandomAccessFile() {
146 if (!temporary_fd_
) {
152 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
153 char* scratch
) const {
156 fd
= open(filename_
.c_str(), O_RDONLY
);
158 return IOError(filename_
, errno
);
163 ssize_t r
= pread(fd
, scratch
, n
, static_cast<off_t
>(offset
));
164 *result
= Slice(scratch
, (r
< 0) ? 0 : r
);
166 // An error: return a non-ok status
167 s
= IOError(filename_
, errno
);
170 // Close the temporary file descriptor opened earlier.
177 // mmap() based random-access
178 class PosixMmapReadableFile
: public RandomAccessFile
{
180 std::string filename_
;
181 void* mmapped_region_
;
186 // base[0,length-1] contains the mmapped contents of the file.
187 PosixMmapReadableFile(const std::string
& fname
, void* base
, size_t length
,
189 : filename_(fname
), mmapped_region_(base
), length_(length
),
193 virtual ~PosixMmapReadableFile() {
194 munmap(mmapped_region_
, length_
);
198 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
199 char* scratch
) const {
201 if (offset
+ n
> length_
) {
203 s
= IOError(filename_
, EINVAL
);
205 *result
= Slice(reinterpret_cast<char*>(mmapped_region_
) + offset
, n
);
211 class PosixWritableFile
: public WritableFile
{
213 std::string filename_
;
217 PosixWritableFile(const std::string
& fname
, FILE* f
)
218 : filename_(fname
), file_(f
) { }
220 ~PosixWritableFile() {
222 // Ignoring any potential errors
227 virtual Status
Append(const Slice
& data
) {
228 size_t r
= fwrite_unlocked(data
.data(), 1, data
.size(), file_
);
229 if (r
!= data
.size()) {
230 return IOError(filename_
, errno
);
235 virtual Status
Close() {
237 if (fclose(file_
) != 0) {
238 result
= IOError(filename_
, errno
);
244 virtual Status
Flush() {
245 if (fflush_unlocked(file_
) != 0) {
246 return IOError(filename_
, errno
);
251 Status
SyncDirIfManifest() {
252 const char* f
= filename_
.c_str();
253 const char* sep
= strrchr(f
, '/');
260 dir
= std::string(f
, sep
- f
);
264 if (basename
.starts_with("MANIFEST")) {
265 int fd
= open(dir
.c_str(), O_RDONLY
);
267 s
= IOError(dir
, errno
);
269 if (fsync(fd
) < 0 && errno
!= EINVAL
) {
270 s
= IOError(dir
, errno
);
278 virtual Status
Sync() {
279 // Ensure new files referred to by the manifest are in the filesystem.
280 Status s
= SyncDirIfManifest();
284 if (fflush_unlocked(file_
) != 0 ||
285 fdatasync(fileno(file_
)) != 0) {
286 s
= Status::IOError(filename_
, strerror(errno
));
292 static int LockOrUnlock(int fd
, bool lock
) {
295 memset(&f
, 0, sizeof(f
));
296 f
.l_type
= (lock
? F_WRLCK
: F_UNLCK
);
297 f
.l_whence
= SEEK_SET
;
299 f
.l_len
= 0; // Lock/unlock entire file
300 return fcntl(fd
, F_SETLK
, &f
);
303 class PosixFileLock
: public FileLock
{
309 // Set of locked files. We keep a separate set instead of just
310 // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
311 // any protection against multiple uses from the same process.
312 class PosixLockTable
{
315 std::set
<std::string
> locked_files_
;
317 bool Insert(const std::string
& fname
) {
319 return locked_files_
.insert(fname
).second
;
321 void Remove(const std::string
& fname
) {
323 locked_files_
.erase(fname
);
327 class PosixEnv
: public Env
{
330 virtual ~PosixEnv() {
331 char msg
[] = "Destroying Env::Default()\n";
332 fwrite(msg
, 1, sizeof(msg
), stderr
);
336 virtual Status
NewSequentialFile(const std::string
& fname
,
337 SequentialFile
** result
) {
338 FILE* f
= fopen(fname
.c_str(), "r");
341 return IOError(fname
, errno
);
343 *result
= new PosixSequentialFile(fname
, f
);
348 virtual Status
NewRandomAccessFile(const std::string
& fname
,
349 RandomAccessFile
** result
) {
352 int fd
= open(fname
.c_str(), O_RDONLY
);
354 s
= IOError(fname
, errno
);
355 } else if (mmap_limit_
.Acquire()) {
357 s
= GetFileSize(fname
, &size
);
359 void* base
= mmap(NULL
, size
, PROT_READ
, MAP_SHARED
, fd
, 0);
360 if (base
!= MAP_FAILED
) {
361 *result
= new PosixMmapReadableFile(fname
, base
, size
, &mmap_limit_
);
363 s
= IOError(fname
, errno
);
368 mmap_limit_
.Release();
371 *result
= new PosixRandomAccessFile(fname
, fd
, &fd_limit_
);
376 virtual Status
NewWritableFile(const std::string
& fname
,
377 WritableFile
** result
) {
379 FILE* f
= fopen(fname
.c_str(), "w");
382 s
= IOError(fname
, errno
);
384 *result
= new PosixWritableFile(fname
, f
);
389 virtual Status
NewAppendableFile(const std::string
& fname
,
390 WritableFile
** result
) {
392 FILE* f
= fopen(fname
.c_str(), "a");
395 s
= IOError(fname
, errno
);
397 *result
= new PosixWritableFile(fname
, f
);
402 virtual bool FileExists(const std::string
& fname
) {
403 return access(fname
.c_str(), F_OK
) == 0;
406 virtual Status
GetChildren(const std::string
& dir
,
407 std::vector
<std::string
>* result
) {
409 DIR* d
= opendir(dir
.c_str());
411 return IOError(dir
, errno
);
413 struct dirent
* entry
;
414 while ((entry
= readdir(d
)) != NULL
) {
415 result
->push_back(entry
->d_name
);
421 virtual Status
DeleteFile(const std::string
& fname
) {
423 if (unlink(fname
.c_str()) != 0) {
424 result
= IOError(fname
, errno
);
429 virtual Status
CreateDir(const std::string
& name
) {
431 if (mkdir(name
.c_str(), 0755) != 0) {
432 result
= IOError(name
, errno
);
437 virtual Status
DeleteDir(const std::string
& name
) {
439 if (rmdir(name
.c_str()) != 0) {
440 result
= IOError(name
, errno
);
445 virtual Status
GetFileSize(const std::string
& fname
, uint64_t* size
) {
448 if (stat(fname
.c_str(), &sbuf
) != 0) {
450 s
= IOError(fname
, errno
);
452 *size
= sbuf
.st_size
;
457 virtual Status
RenameFile(const std::string
& src
, const std::string
& target
) {
459 if (rename(src
.c_str(), target
.c_str()) != 0) {
460 result
= IOError(src
, errno
);
465 virtual Status
LockFile(const std::string
& fname
, FileLock
** lock
) {
468 int fd
= open(fname
.c_str(), O_RDWR
| O_CREAT
, 0644);
470 result
= IOError(fname
, errno
);
471 } else if (!locks_
.Insert(fname
)) {
473 result
= Status::IOError("lock " + fname
, "already held by process");
474 } else if (LockOrUnlock(fd
, true) == -1) {
475 result
= IOError("lock " + fname
, errno
);
477 locks_
.Remove(fname
);
479 PosixFileLock
* my_lock
= new PosixFileLock
;
481 my_lock
->name_
= fname
;
487 virtual Status
UnlockFile(FileLock
* lock
) {
488 PosixFileLock
* my_lock
= reinterpret_cast<PosixFileLock
*>(lock
);
490 if (LockOrUnlock(my_lock
->fd_
, false) == -1) {
491 result
= IOError("unlock", errno
);
493 locks_
.Remove(my_lock
->name_
);
499 virtual void Schedule(void (*function
)(void*), void* arg
);
501 virtual void StartThread(void (*function
)(void* arg
), void* arg
);
503 virtual Status
GetTestDirectory(std::string
* result
) {
504 const char* env
= getenv("TEST_TMPDIR");
505 if (env
&& env
[0] != '\0') {
509 snprintf(buf
, sizeof(buf
), "/tmp/leveldbtest-%d", int(geteuid()));
512 // Directory may already exist
517 static uint64_t gettid() {
518 pthread_t tid
= pthread_self();
519 uint64_t thread_id
= 0;
520 memcpy(&thread_id
, &tid
, std::min(sizeof(thread_id
), sizeof(tid
)));
524 virtual Status
NewLogger(const std::string
& fname
, Logger
** result
) {
525 FILE* f
= fopen(fname
.c_str(), "w");
528 return IOError(fname
, errno
);
530 *result
= new PosixLogger(f
, &PosixEnv::gettid
);
535 virtual uint64_t NowMicros() {
537 gettimeofday(&tv
, NULL
);
538 return static_cast<uint64_t>(tv
.tv_sec
) * 1000000 + tv
.tv_usec
;
541 virtual void SleepForMicroseconds(int micros
) {
546 void PthreadCall(const char* label
, int result
) {
548 fprintf(stderr
, "pthread %s: %s\n", label
, strerror(result
));
553 // BGThread() is the body of the background thread
555 static void* BGThreadWrapper(void* arg
) {
556 reinterpret_cast<PosixEnv
*>(arg
)->BGThread();
561 pthread_cond_t bgsignal_
;
563 bool started_bgthread_
;
565 // Entry per Schedule() call
566 struct BGItem
{ void* arg
; void (*function
)(void*); };
567 typedef std::deque
<BGItem
> BGQueue
;
570 PosixLockTable locks_
;
575 // Return the maximum number of concurrent mmaps.
576 static int MaxMmaps() {
577 if (mmap_limit
>= 0) {
580 // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
581 mmap_limit
= sizeof(void*) >= 8 ? 1000 : 0;
585 // Return the maximum number of read-only files to keep open.
586 static intptr_t MaxOpenFiles() {
587 if (open_read_only_file_limit
>= 0) {
588 return open_read_only_file_limit
;
591 if (getrlimit(RLIMIT_NOFILE
, &rlim
)) {
592 // getrlimit failed, fallback to hard-coded default.
593 open_read_only_file_limit
= 50;
594 } else if (rlim
.rlim_cur
== RLIM_INFINITY
) {
595 open_read_only_file_limit
= std::numeric_limits
<int>::max();
597 // Allow use of 20% of available file descriptors for read-only files.
598 open_read_only_file_limit
= rlim
.rlim_cur
/ 5;
600 return open_read_only_file_limit
;
604 : started_bgthread_(false),
605 mmap_limit_(MaxMmaps()),
606 fd_limit_(MaxOpenFiles()) {
607 PthreadCall("mutex_init", pthread_mutex_init(&mu_
, NULL
));
608 PthreadCall("cvar_init", pthread_cond_init(&bgsignal_
, NULL
));
611 void PosixEnv::Schedule(void (*function
)(void*), void* arg
) {
612 PthreadCall("lock", pthread_mutex_lock(&mu_
));
614 // Start background thread if necessary
615 if (!started_bgthread_
) {
616 started_bgthread_
= true;
619 pthread_create(&bgthread_
, NULL
, &PosixEnv::BGThreadWrapper
, this));
622 // If the queue is currently empty, the background thread may currently be
624 if (queue_
.empty()) {
625 PthreadCall("signal", pthread_cond_signal(&bgsignal_
));
628 // Add to priority queue
629 queue_
.push_back(BGItem());
630 queue_
.back().function
= function
;
631 queue_
.back().arg
= arg
;
633 PthreadCall("unlock", pthread_mutex_unlock(&mu_
));
636 void PosixEnv::BGThread() {
638 // Wait until there is an item that is ready to run
639 PthreadCall("lock", pthread_mutex_lock(&mu_
));
640 while (queue_
.empty()) {
641 PthreadCall("wait", pthread_cond_wait(&bgsignal_
, &mu_
));
644 void (*function
)(void*) = queue_
.front().function
;
645 void* arg
= queue_
.front().arg
;
648 PthreadCall("unlock", pthread_mutex_unlock(&mu_
));
654 struct StartThreadState
{
655 void (*user_function
)(void*);
659 static void* StartThreadWrapper(void* arg
) {
660 StartThreadState
* state
= reinterpret_cast<StartThreadState
*>(arg
);
661 state
->user_function(state
->arg
);
666 void PosixEnv::StartThread(void (*function
)(void* arg
), void* arg
) {
668 StartThreadState
* state
= new StartThreadState
;
669 state
->user_function
= function
;
671 PthreadCall("start thread",
672 pthread_create(&t
, NULL
, &StartThreadWrapper
, state
));
677 static pthread_once_t once
= PTHREAD_ONCE_INIT
;
678 static Env
* default_env
;
679 static void InitDefaultEnv() { default_env
= new PosixEnv
; }
681 void EnvPosixTestHelper::SetReadOnlyFDLimit(int limit
) {
682 assert(default_env
== NULL
);
683 open_read_only_file_limit
= limit
;
686 void EnvPosixTestHelper::SetReadOnlyMMapLimit(int limit
) {
687 assert(default_env
== NULL
);
691 Env
* Env::Default() {
692 pthread_once(&once
, InitDefaultEnv
);
696 } // namespace leveldb