From a866a593497efc3b476343726f9903bd27a63e5b Mon Sep 17 00:00:00 2001 From: Keith Rarick Date: Mon, 28 Sep 2009 19:13:40 -0700 Subject: [PATCH] Reserve space for certain events ahead of time. Delete records are appended to the binlog just like everything else. That means if there's no disk space available, the user won't be able to delete jobs in an attempt to free up disk space. So we allocate and reserve space ahead of time for deleting a job. Also quite a bit of refactoring in this commit. --- beanstalkd.c | 7 +- binlog.c | 350 ++++++++++++++++++++++++++++++++++++++--------------------- binlog.h | 13 +-- 3 files changed, 231 insertions(+), 139 deletions(-) diff --git a/beanstalkd.c b/beanstalkd.c index 9e7f18f..625fef0 100644 --- a/beanstalkd.c +++ b/beanstalkd.c @@ -98,7 +98,7 @@ su(const char *user) { void exit_cleanly(int sig) { - binlog_close(); + binlog_shutdown(); exit(0); } @@ -287,8 +287,7 @@ main(int argc, char **argv) unbrake((evh) h_accept); binlog_jobs.prev = binlog_jobs.next = &binlog_jobs; - binlog_read(&binlog_jobs); - binlog_init(); + binlog_init(&binlog_jobs); prot_replay_binlog(&binlog_jobs); if (detach) { @@ -297,7 +296,7 @@ main(int argc, char **argv) } event_dispatch(); - binlog_close(); + binlog_shutdown(); twarnx("got here for some reason"); return 0; } diff --git a/binlog.c b/binlog.c index 3368e54..3568774 100644 --- a/binlog.c +++ b/binlog.c @@ -36,19 +36,28 @@ #include "util.h" #include "config.h" +typedef struct binlog *binlog; + +struct binlog { + binlog next; + unsigned int refs; + int fd; + size_t free; + size_t reserved; + char path[]; +}; + /* max size we will create a log file */ size_t binlog_size_limit = 10 << 20; char *binlog_dir = NULL; static int binlog_index = 0; -static int binlog_fd = -1, next_binlog_fd = -1; static int binlog_version = 2; -static size_t binlog_space = 0, next_binlog_space = 0; -static size_t binlog_reserved = 0, next_binlog_reserved = 0; -static size_t bytes_written; static int lock_fd; -static binlog first_binlog = NULL, last_binlog = NULL, next_binlog = NULL; +static binlog oldest_binlog = 0, + current_binlog = 0, + newest_binlog = 0; static int binlog_scan_dir() @@ -81,14 +90,13 @@ binlog_scan_dir() } static void -binlog_remove_first() +binlog_remove_oldest() { - binlog b = first_binlog; + binlog b = oldest_binlog; if (!b) return; - first_binlog = b->next; - if (!first_binlog) last_binlog = NULL; + oldest_binlog = b->next; unlink(b->path); free(b); @@ -109,7 +117,9 @@ binlog_dref(binlog b) --b->refs; if (b->refs < 1) { - while (first_binlog && first_binlog->refs == 0) binlog_remove_first(); + while (oldest_binlog && oldest_binlog->refs == 0) { + binlog_remove_oldest(); + } } } @@ -122,13 +132,13 @@ binlog_warn(int fd, const char* path, const char *msg) } */ -#define binlog_warn(fd, path, fmt, args...) \ +#define binlog_warn(b, fmt, args...) \ warnx("WARNING, " fmt " at %s:%u. %s: ", \ - ##args, path, lseek(fd, 0, SEEK_CUR), \ + ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \ "Continuing. You may be missing data.") static void -binlog_read_one(int fd, job binlog_jobs, const char *path) +binlog_read_log_file(binlog b, job binlog_jobs) { struct job js; tube t; @@ -138,32 +148,32 @@ binlog_read_one(int fd, job binlog_jobs, const char *path) ssize_t r; int version; - r = read(fd, &version, sizeof(version)); + r = read(b->fd, &version, sizeof(version)); if (r == -1) return twarn("read()"); if (r < sizeof(version)) { - return binlog_warn(fd, path, "EOF while reading version record"); + return binlog_warn(b, "EOF while reading version record"); } if (version != binlog_version) { - return warnx("%s: binlog version mismatch %d %d", path, version, + return warnx("%s: binlog version mismatch %d %d", b->path, version, binlog_version); } - while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) { + while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) { if (namelen > 0) { - r = read(fd, tubename, namelen); + r = read(b->fd, tubename, namelen); if (r == -1) return twarn("read()"); if (r < namelen) { - lseek(fd, SEEK_CUR, 0); - return binlog_warn(fd, path, "EOF while reading tube name"); + lseek(b->fd, SEEK_CUR, 0); + return binlog_warn(b, "EOF while reading tube name"); } } tubename[namelen] = '\0'; - r = read(fd, &js, sizeof(struct job)); + r = read(b->fd, &js, sizeof(struct job)); if (r == -1) return twarn("read()"); if (r < sizeof(struct job)) { - return binlog_warn(fd, path, "EOF while reading job record"); + return binlog_warn(b, "EOF while reading job record"); } if (!js.id) break; @@ -195,16 +205,16 @@ binlog_read_one(int fd, job binlog_jobs, const char *path) job_remove(j); binlog_dref(j->binlog); job_free(j); - return binlog_warn(fd, path, "EOF while reading job body"); + return binlog_warn(b, "EOF while reading job body"); } - r = read(fd, j->body, js.body_size); + r = read(b->fd, j->body, js.body_size); if (r == -1) return twarn("read()"); if (r < js.body_size) { warnx("dropping incomplete job %llu", j->id); job_remove(j); binlog_dref(j->binlog); job_free(j); - return binlog_warn(fd, path, "EOF while reading job body"); + return binlog_warn(b, "EOF while reading job body"); } } break; @@ -223,19 +233,20 @@ binlog_read_one(int fd, job binlog_jobs, const char *path) /* this is a complete record, so we can move the binlog ref */ if (namelen && js.body_size) { binlog_dref(j->binlog); - j->binlog = binlog_iref(last_binlog); + j->binlog = binlog_iref(b); } } } } static void -binlog_close_last() +binlog_close(binlog b) { - if (binlog_fd < 0) return; - close(binlog_fd); - binlog_dref(last_binlog); - binlog_fd = -1; + if (!b) return; + if (b->fd < 0) return; + close(b->fd); + binlog_dref(b); + b->fd = -1; } static binlog @@ -248,6 +259,9 @@ make_binlog(char *path) strcpy(b->path, path); b->refs = 0; b->next = NULL; + b->fd = -1; + b->free = 0; + b->reserved = 0; return b; } @@ -268,23 +282,24 @@ make_next_binlog() static binlog add_binlog(binlog b) { - if (last_binlog) last_binlog->next = b; - last_binlog = b; - if (!first_binlog) first_binlog = b; + if (newest_binlog) newest_binlog->next = b; + newest_binlog = b; + if (!oldest_binlog) oldest_binlog = b; return b; } -static int -binlog_open(binlog log) +static void +binlog_open(binlog log, size_t *written) { int fd; + size_t bytes_written; - if (!binlog_iref(log)) return -1; + if (!binlog_iref(log)) return; fd = open(log->path, O_WRONLY | O_CREAT, 0400); - if (fd < 0) return twarn("Cannot open binlog %s", log->path), -1; + if (fd < 0) return twarn("Cannot open binlog %s", log->path); #ifdef HAVE_POSIX_FALLOCATE { @@ -294,7 +309,7 @@ binlog_open(binlog log) close(fd); binlog_dref(log); errno = r; - return twarn("Cannot allocate space for binlog %s", log->path), -1; + return twarn("Cannot allocate space for binlog %s", log->path); } } #else @@ -312,7 +327,7 @@ binlog_open(binlog log) twarn("Cannot allocate space for binlog %s", log->path); close(fd); binlog_dref(log); - return -1; + return; } } @@ -321,7 +336,7 @@ binlog_open(binlog log) twarn("lseek"); close(fd); binlog_dref(log); - return -1; + return; } } #endif @@ -332,44 +347,42 @@ binlog_open(binlog log) twarn("Cannot write to binlog"); close(fd); binlog_dref(log); - return -1; + return; } - return fd; + if (written) *written = bytes_written; + log->fd = fd; } /* returns 1 on success, 0 on error. */ static int binlog_use_next() { - if (binlog_fd < 0) return 0; - if (next_binlog_fd < 0) return 0; - if (binlog_reserved > next_binlog_space) return twarnx("overextended"), 0; + binlog next; - binlog_close_last(); + if (!current_binlog) return 0; - binlog_fd = next_binlog_fd; - add_binlog(next_binlog); + next = current_binlog->next; - next_binlog = NULL; - next_binlog_fd = -1; + if (!next) return 0; - binlog_space = next_binlog_space - binlog_reserved; - binlog_reserved = next_binlog_reserved + binlog_reserved; + /* assert(current_binlog->reserved == 0); */ + + binlog_close(current_binlog); + current_binlog = next; - next_binlog_reserved = next_binlog_space = 0; return 1; } void -binlog_close() +binlog_shutdown() { binlog_use_next(); - binlog_close_last(); + binlog_close(current_binlog); } /* Returns the number of jobs successfully written (either 0 or 1). */ -/* If we are not using the binlog at all (binlog_fd < 0), then we pretend to +/* If we are not using the binlog at all (!current_binlog), then we pretend to have made a successful write and return 1. */ int binlog_write_job(job j) @@ -379,7 +392,7 @@ binlog_write_job(job j) struct iovec vec[4], *vptr; int vcnt = 3, r; - if (binlog_fd < 0) return 1; + if (!current_binlog) return 1; tube_namelen = 0; vec[0].iov_base = (char *) &tube_namelen; @@ -410,26 +423,26 @@ binlog_write_job(job j) return twarnx("unserializable job state: %d", j->state), 0; } - if (to_write > binlog_reserved) { + if (to_write > current_binlog->reserved) { r = binlog_use_next(); if (!r) return twarnx("failed to use next binlog"), 0; } - if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog); + if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog); while (to_write > 0) { - written = writev(binlog_fd, vec, vcnt); + written = writev(current_binlog->fd, vec, vcnt); if (written < 0) { if (errno == EAGAIN) continue; if (errno == EINTR) continue; twarn("writev"); - binlog_close_last(); + binlog_close(current_binlog); + current_binlog = 0; return 0; } - bytes_written += written; to_write -= written; if (to_write > 0 && written > 0) { for (vptr = vec; written >= vptr->iov_len; vptr++) { @@ -444,40 +457,129 @@ binlog_write_job(job j) return 1; } -/* Returns the number of bytes successfully reserved: either 0 or n. */ +static binlog +make_future_binlog() +{ + binlog b; + size_t header; + + /* open a new binlog with more space to reserve */ + b = make_next_binlog(); + if (!b) return twarnx("error making next binlog"), (binlog) 0; + binlog_open(b, &header); + + /* open failed, so we can't reserve any space */ + if (b->fd < 0) { + free(b); + return 0; + } + + b->free = binlog_size_limit - header; + b->reserved = 0; + return b; +} + +static int +can_move_reserved(size_t n, binlog from, binlog to) +{ + return from->reserved >= n && to->free >= n; +} + +static void +move_reserved(size_t n, binlog from, binlog to) +{ + from->reserved -= n; + from->free += n; + to->reserved += n; + to->free -= n; +} + static size_t -binlog_reserve_space(size_t n) +ensure_free_space(size_t n) { - /* This value must be nonzero but is otherwise ignored. */ - if (binlog_fd < 0) return 1; + binlog fb; - if (n <= binlog_space) { - binlog_space -= n; - binlog_reserved += n; - return n; + if (newest_binlog->free >= n) return n; + + /* open a new binlog */ + fb = make_future_binlog(); + if (!fb) return twarnx("make_future_binlog"), 0; + + add_binlog(fb); + return n; +} + +/* Preserve some invariants immediately after any space reservation. + * Invariant 1: current_binlog->reserved >= n. + * Invariant 2: current_binlog->reserved is congruent to n (mod z), where z + * is the size of a delete record in the binlog. */ +static size_t +maintain_invariant(size_t n) +{ + size_t reserved_later, remainder, complement, z, r; + + /* In this function, reserved bytes are conserved (they are neither created + * nor destroyed). We just move them around to preserve the invariant. We + * might have to create new free space (i.e. allocate a new binlog file), + * though. */ + + /* Invariant 1. */ + /* This is a loop, but it's guaranteed to run at most once. The proof is + * left as an exercise for the reader. */ + while (current_binlog->reserved < n) { + size_t to_move = current_binlog->reserved; + + r = ensure_free_space(to_move); + if (r != to_move) { + /* TODO unreserve n bytes */ + return twarnx("ensure_free_space"), 0; + } + + move_reserved(to_move, current_binlog, newest_binlog); + binlog_use_next(); } - if (n <= next_binlog_space) { - next_binlog_space -= n; - next_binlog_reserved += n; + /* Invariant 2. */ + + + z = sizeof(size_t) + sizeof(struct job); + reserved_later = current_binlog->reserved - n; + remainder = reserved_later % z; + if (remainder == 0) return n; + complement = z - remainder; + if (can_move_reserved(complement, newest_binlog, current_binlog)) { + move_reserved(complement, newest_binlog, current_binlog); return n; } - /* The next binlog is already allocated and it is full. */ - if (next_binlog_fd >= 0) return 0; + r = ensure_free_space(remainder); + if (r != remainder) { + /* TODO unreserve n bytes */ + return twarnx("ensure_free_space"), 0; + } + move_reserved(remainder, current_binlog, newest_binlog); - /* open a new binlog with more space to reserve */ - next_binlog = make_next_binlog(); - if (!next_binlog) return twarnx("error making next binlog"), 0; - next_binlog_fd = binlog_open(next_binlog); + return n; +} - /* open failed, so we can't reserve any space */ - if (next_binlog_fd < 0) return 0; +/* Returns the number of bytes successfully reserved: either 0 or n. */ +static size_t +binlog_reserve_space(size_t n) +{ + /* This return value must be nonzero but is otherwise ignored. */ + if (!current_binlog) return 1; - next_binlog_space = binlog_size_limit - bytes_written - n; - next_binlog_reserved = n; + if (current_binlog->free >= n) { + current_binlog->free -= n; + current_binlog->reserved += n; + return maintain_invariant(n); + } - return n; + ensure_free_space(n); + + newest_binlog->free -= n; + newest_binlog->reserved += n; + return maintain_invariant(n); } /* Returns the number of bytes reserved. */ @@ -509,8 +611,31 @@ binlog_reserve_space_update(job j) return binlog_reserve_space(z); } +int +binlog_lock() +{ + int r; + struct flock lock; + char path[PATH_MAX]; + + r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir); + if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0; + + lock_fd = open(path, O_WRONLY|O_CREAT, 0600); + if (lock_fd == -1) return twarn("open"), 0; + + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + r = fcntl(lock_fd, F_SETLK, &lock); + if (r) return twarn("fcntl"), 0; + + return 1; +} + void -binlog_read(job binlog_jobs) +binlog_init(job binlog_jobs) { int binlog_index_min; struct stat sbuf; @@ -520,6 +645,8 @@ binlog_read(job binlog_jobs) if (!binlog_dir) return; + /* Recover any jobs in old binlogs */ + if (stat(binlog_dir, &sbuf) < 0) { if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir); } else if (!(sbuf.st_mode & S_IFDIR)) { @@ -540,63 +667,38 @@ binlog_read(job binlog_jobs) twarn("%s", path); } else { b = binlog_iref(add_binlog(make_binlog(path))); - binlog_read_one(fd, binlog_jobs, path); + b->fd = fd; + binlog_read_log_file(b, binlog_jobs); close(fd); + b->fd = -1; binlog_dref(b); } } } -} -int -binlog_lock() -{ - int r; - struct flock lock; - char path[PATH_MAX]; - - r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir); - if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0; - - lock_fd = open(path, O_WRONLY|O_CREAT, 0600); - if (lock_fd == -1) return twarn("open"), 0; - - lock.l_type = F_WRLCK; - lock.l_whence = SEEK_SET; - lock.l_start = 0; - lock.l_len = 0; - r = fcntl(lock_fd, F_SETLK, &lock); - if (r) return twarn("fcntl"), 0; - return 1; -} - -void -binlog_init() -{ - binlog log; - - if (!binlog_dir) return; + /* Set up for writing out new jobs */ - log = make_next_binlog(); - if (!log) return twarnx("error making first binlog"); - binlog_fd = binlog_open(log); - if (binlog_fd >= 0) add_binlog(log); + b = make_next_binlog(); + if (!b) return twarnx("error making first binlog"); + binlog_open(b, 0); + if (b->fd >= 0) add_binlog(b); + current_binlog = b; } const char * binlog_oldest_index() { - if (!first_binlog) return "0"; + if (!oldest_binlog) return "0"; - return strrchr(first_binlog->path, '.') + 1; + return strrchr(oldest_binlog->path, '.') + 1; } const char * binlog_current_index() { - if (!last_binlog) return "0"; + if (!newest_binlog) return "0"; - return strrchr(last_binlog->path, '.') + 1; + return strrchr(newest_binlog->path, '.') + 1; } diff --git a/binlog.h b/binlog.h index b41eb5a..55e8907 100644 --- a/binlog.h +++ b/binlog.h @@ -21,18 +21,10 @@ #include "job.h" -typedef struct binlog *binlog; - -struct binlog { - binlog next; - unsigned int refs; - char path[]; -}; - extern char *binlog_dir; extern size_t binlog_size_limit; -void binlog_init(); +void binlog_init(job binlog_jobs); /* Return the number of locks acquired: either 0 or 1. */ int binlog_lock(); @@ -42,8 +34,7 @@ int binlog_write_job(job j); size_t binlog_reserve_space_put(job j); size_t binlog_reserve_space_update(job j); -void binlog_read(job binlog_jobs); -void binlog_close(); +void binlog_shutdown(); const char *binlog_oldest_index(); const char *binlog_current_index(); -- 2.11.4.GIT