Make sure fd is always initialized.
[beanstalkd.git] / binlog.c
blob06049d4b56c49daf129f9eeecede29ecd4533f02
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "config.h"
21 #if HAVE_STDINT_H
22 # include <stdint.h>
23 #endif /* else we get int types from config.h */
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <fcntl.h>
28 #include <unistd.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <dirent.h>
32 #include <sys/resource.h>
33 #include <sys/param.h>
34 #include <sys/uio.h>
35 #include <sys/stat.h>
36 #include <stdarg.h>
37 #include <limits.h>
38 #include <stddef.h>
40 #include "tube.h"
41 #include "job.h"
42 #include "binlog.h"
43 #include "util.h"
44 #include "port.h"
46 typedef struct binlog *binlog;
48 struct binlog {
49 binlog next;
50 unsigned int refs;
51 int fd;
52 size_t free;
53 size_t reserved;
54 char path[];
57 /* max size we will create a log file */
58 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
60 int enable_fsync = 0;
61 size_t fsync_throttle_ms = 0;
62 uint64_t last_fsync = 0;
64 char *binlog_dir = NULL;
65 static int binlog_index = 0;
66 static int binlog_version = 5;
67 static int lock_fd;
69 static binlog oldest_binlog = 0,
70 current_binlog = 0,
71 newest_binlog = 0;
73 static const size_t job_record_size = offsetof(struct job, pad);
75 static int
76 binlog_scan_dir()
78 DIR *dirp;
79 struct dirent *dp;
80 long min = 0;
81 long max = 0;
82 long val;
83 char *endptr;
84 size_t name_len;
86 dirp = opendir(binlog_dir);
87 if (!dirp) return 0;
89 while ((dp = readdir(dirp)) != NULL) {
90 name_len = strlen(dp->d_name);
91 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
92 val = strtol(dp->d_name + 7, &endptr, 10);
93 if (endptr && *endptr == 0) {
94 if (max == 0 || val > max) max = val;
95 if (min == 0 || val < min) min = val;
100 closedir(dirp);
101 binlog_index = (int) max;
102 return (int) min;
105 static void
106 binlog_remove_oldest()
108 binlog b = oldest_binlog;
110 if (!b) return;
112 oldest_binlog = b->next;
114 unlink(b->path);
115 free(b);
118 static binlog
119 binlog_iref(binlog b)
121 if (b) b->refs++;
122 return b;
125 static void
126 binlog_dref(binlog b)
128 if (!b) return;
129 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
131 --b->refs;
132 if (b->refs < 1) {
133 while (oldest_binlog && oldest_binlog->refs == 0) {
134 binlog_remove_oldest();
140 static void
141 binlog_warn(int fd, const char* path, const char *msg)
143 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
144 " Continuing. You may be missing data.");
148 #define binlog_warn(b, fmt, args...) \
149 warnx("WARNING, " fmt " at %s:%u. %s: ", \
150 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
151 "Continuing. You may be missing data.")
153 static void
154 binlog_read_log_file(binlog b, job binlog_jobs)
156 struct job js;
157 tube t;
158 job j;
159 char tubename[MAX_TUBE_NAME_LEN];
160 size_t namelen;
161 ssize_t r;
162 int version;
164 r = read(b->fd, &version, sizeof(version));
165 if (r == -1) return twarn("read()");
166 if (r < sizeof(version)) {
167 return binlog_warn(b, "EOF while reading version record");
170 if (version != binlog_version) {
171 return warnx("%s: binlog version mismatch %d %d", b->path, version,
172 binlog_version);
175 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
176 if (namelen >= MAX_TUBE_NAME_LEN) {
177 return binlog_warn(b, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
180 if (namelen > 0) {
181 r = read(b->fd, tubename, namelen);
182 if (r == -1) return twarn("read()");
183 if (r < namelen) {
184 lseek(b->fd, SEEK_CUR, 0);
185 return binlog_warn(b, "EOF while reading tube name");
189 tubename[namelen] = '\0';
190 r = read(b->fd, &js, job_record_size);
191 if (r == -1) return twarn("read()");
192 if (r < job_record_size) {
193 return binlog_warn(b, "EOF while reading job record");
196 if (!js.id) break;
198 j = job_find(js.id);
199 switch (js.state) {
200 case JOB_STATE_INVALID:
201 if (j) {
202 job_remove(j);
203 binlog_dref(j->binlog);
204 job_free(j);
205 j = NULL;
207 break;
208 case JOB_STATE_READY:
209 case JOB_STATE_DELAYED:
210 if (!j && namelen > 0) {
211 t = tube_find_or_make(tubename);
212 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
213 t, js.id);
214 j->next = j->prev = j;
215 j->created_at = js.created_at;
216 job_insert(binlog_jobs, j);
218 if (js.body_size && namelen > 0) { /* namelen > 0 only on new jobs */
219 if (js.body_size > j->body_size) {
220 warnx("job size increased from %zu to %zu", j->body_size,
221 js.body_size);
222 job_remove(j);
223 binlog_dref(j->binlog);
224 job_free(j);
225 return binlog_warn(b, "EOF while reading job body");
227 r = read(b->fd, j->body, js.body_size);
228 if (r == -1) return twarn("read()");
229 if (r < js.body_size) {
230 warnx("dropping incomplete job %llu", j->id);
231 job_remove(j);
232 binlog_dref(j->binlog);
233 job_free(j);
234 return binlog_warn(b, "EOF while reading job body");
237 break;
239 if (j) {
240 j->state = js.state;
241 j->deadline_at = js.deadline_at;
242 j->pri = js.pri;
243 j->delay = js.delay;
244 j->ttr = js.ttr;
245 j->timeout_ct = js.timeout_ct;
246 j->release_ct = js.release_ct;
247 j->bury_ct = js.bury_ct;
248 j->kick_ct = js.kick_ct;
250 /* this is a complete record, so we can move the binlog ref */
251 if (namelen && js.body_size) {
252 binlog_dref(j->binlog);
253 j->binlog = binlog_iref(b);
259 static void
260 binlog_close(binlog b)
262 int r;
264 if (!b) return;
265 if (b->fd < 0) return;
266 if (b->free) {
267 // Some compilers give a warning if the return value of ftruncate is
268 // ignored. So we pretend to use it.
269 r = ftruncate(b->fd, binlog_size_limit - b->free);
270 if (r == -1) {
271 // Nothing we can do. The user might see warnings next startup.
274 close(b->fd);
275 b->fd = -1;
276 binlog_dref(b);
279 static binlog
280 make_binlog(char *path)
282 binlog b;
284 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
285 if (!b) return twarnx("OOM"), (binlog) 0;
286 strcpy(b->path, path);
287 b->refs = 0;
288 b->next = NULL;
289 b->fd = -1;
290 b->free = 0;
291 b->reserved = 0;
292 return b;
295 static binlog
296 make_next_binlog()
298 int r;
299 char path[PATH_MAX];
301 if (!binlog_dir) return NULL;
303 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
304 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
306 return make_binlog(path);
309 static binlog
310 add_binlog(binlog b)
312 if (newest_binlog) newest_binlog->next = b;
313 newest_binlog = b;
314 if (!oldest_binlog) oldest_binlog = b;
316 return b;
319 static int
320 beanstalkd_fallocate(int fd, off_t offset, off_t len)
322 off_t i;
323 ssize_t w;
324 off_t p;
325 #define ZERO_BUF_SIZE 512
326 char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
328 /* we only support a 0 offset */
329 if (offset != 0) return EINVAL;
331 if (len <= 0) return EINVAL;
333 for (i = 0; i < len; i += w) {
334 w = write(fd, &buf, ZERO_BUF_SIZE);
335 if (w == -1) return errno;
338 p = lseek(fd, 0, SEEK_SET);
339 if (p == -1) return errno;
341 return 0;
344 static void
345 binlog_open(binlog log, size_t *written)
347 int fd;
348 size_t bytes_written;
350 if (written) *written = 0;
352 if (!binlog_iref(log)) return;
354 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
356 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
358 #ifdef HAVE_POSIX_FALLOCATE
360 int r;
361 r = posix_fallocate(fd, 0, binlog_size_limit);
362 if (r == EINVAL) {
363 r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
365 if (r) {
366 close(fd);
367 binlog_dref(log);
368 errno = r;
369 return twarn("Cannot allocate space for binlog %s", log->path);
372 #else
373 /* Allocate space in a slow but portable way. */
375 int r;
376 r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
377 if (r) {
378 close(fd);
379 binlog_dref(log);
380 errno = r;
381 return twarn("Cannot allocate space for binlog %s", log->path);
384 #endif
386 bytes_written = write(fd, &binlog_version, sizeof(int));
387 if (written) *written = bytes_written;
389 if (bytes_written < sizeof(int)) {
390 twarn("Cannot write to binlog");
391 close(fd);
392 binlog_dref(log);
393 return;
396 log->fd = fd;
399 /* returns 1 on success, 0 on error. */
400 static int
401 binlog_use_next()
403 binlog next;
405 if (!current_binlog) return 0;
407 next = current_binlog->next;
409 if (!next) return 0;
411 /* assert(current_binlog->reserved == 0); */
413 binlog_close(current_binlog);
414 current_binlog = next;
416 return 1;
419 void
420 binlog_shutdown()
422 binlog_use_next();
423 binlog_close(current_binlog);
426 /* Returns the number of jobs successfully written (either 0 or 1).
428 If this fails, something is seriously wrong. It should never fail because of
429 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
430 fail because of a full disk.)
432 If we are not using the binlog at all (!current_binlog), then we pretend to
433 have made a successful write and return 1. */
435 binlog_write_job(job j)
437 ssize_t written;
438 size_t tube_namelen, to_write = 0;
439 struct iovec vec[4], *vptr;
440 int vcnt = 3, r;
441 uint64_t now;
443 if (!current_binlog) return 1;
444 tube_namelen = 0;
446 vec[0].iov_base = (char *) &tube_namelen;
447 to_write += vec[0].iov_len = sizeof(size_t);
449 vec[1].iov_base = j->tube->name;
450 vec[1].iov_len = 0;
452 vec[2].iov_base = (char *) j;
453 to_write += vec[2].iov_len = job_record_size;
455 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED ||
456 j->state == JOB_STATE_BURIED) {
457 if (!j->binlog) {
458 tube_namelen = strlen(j->tube->name);
459 to_write += vec[1].iov_len = tube_namelen;
460 vcnt = 4;
461 vec[3].iov_base = j->body;
462 to_write += vec[3].iov_len = j->body_size;
464 } else if (j->state == JOB_STATE_INVALID) {
465 if (j->binlog) binlog_dref(j->binlog);
466 j->binlog = NULL;
467 } else {
468 return twarnx("unserializable job state: %d", j->state), 0;
471 if (to_write > current_binlog->reserved) {
472 r = binlog_use_next();
473 if (!r) return twarnx("failed to use next binlog"), 0;
476 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
478 while (to_write > 0) {
479 written = writev(current_binlog->fd, vec, vcnt);
481 if (written < 0) {
482 if (errno == EAGAIN) continue;
483 if (errno == EINTR) continue;
485 twarn("writev");
486 binlog_close(current_binlog);
487 current_binlog = 0;
488 return 0;
491 current_binlog->reserved -= written;
492 j->reserved_binlog_space -= written;
494 to_write -= written;
495 if (to_write > 0 && written > 0) {
496 for (vptr = vec; written >= vptr->iov_len; vptr++) {
497 written -= vptr->iov_len;
498 vptr->iov_len = 0;
500 vptr->iov_base = (char *) vptr->iov_base + written;
501 vptr->iov_len -= written;
505 now = now_usec() / 1000; /* usec -> msec */
506 if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
507 r = fdatasync(current_binlog->fd);
508 if (r == -1) return twarn("fdatasync"), 0;
509 last_fsync = now;
512 return 1;
515 static binlog
516 make_future_binlog()
518 binlog b;
519 size_t header;
521 /* open a new binlog with more space to reserve */
522 b = make_next_binlog();
523 if (!b) return twarnx("error making next binlog"), (binlog) 0;
524 binlog_open(b, &header);
526 /* open failed, so we can't reserve any space */
527 if (b->fd < 0) {
528 free(b);
529 return 0;
532 b->free = binlog_size_limit - header;
533 b->reserved = 0;
534 return b;
537 static int
538 can_move_reserved(size_t n, binlog from, binlog to)
540 return from->reserved >= n && to->free >= n;
543 static void
544 move_reserved(size_t n, binlog from, binlog to)
546 from->reserved -= n;
547 from->free += n;
548 to->reserved += n;
549 to->free -= n;
552 static size_t
553 ensure_free_space(size_t n)
555 binlog fb;
557 if (newest_binlog && newest_binlog->free >= n) return n;
559 /* open a new binlog */
560 fb = make_future_binlog();
561 if (!fb) return twarnx("make_future_binlog"), 0;
563 add_binlog(fb);
564 return n;
567 /* Ensures:
568 * 1: b->reserved is congruent to n (mod z).
569 * 2: all future binlogs ->reserved is congruent to 0 (mod z).
570 * Returns t on success, and 0 on failure. (Therefore t should be nonzero.) */
571 static size_t
572 maintain_invariants_iter(binlog b, size_t n, size_t t)
574 size_t reserved_later, remainder, complement, z, r;
576 /* In this function, reserved bytes are conserved (they are neither created
577 * nor destroyed). We just move them around to preserve the invariant. We
578 * might have to create new free space (i.e. allocate a new binlog file),
579 * though. */
581 if (!b) return t;
583 z = sizeof(size_t) + job_record_size;
584 reserved_later = b->reserved - n;
585 remainder = reserved_later % z;
586 if (remainder == 0) return maintain_invariants_iter(b->next, 0, t);
588 if (b == newest_binlog) {
589 twarnx("newest binlog has invalid %zd reserved", b->reserved);
590 /* We have failed, so undo the reservation and return 0. */
591 if (newest_binlog->reserved >= t) {
592 newest_binlog->reserved -= t;
593 } else {
594 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
596 return 0;
599 complement = z - remainder;
600 if (can_move_reserved(complement, newest_binlog, b)) {
601 move_reserved(complement, newest_binlog, b);
602 return maintain_invariants_iter(b->next, 0, t);
605 r = ensure_free_space(remainder);
606 if (r != remainder) {
607 twarnx("ensure_free_space");
608 /* We have failed, so undo the reservation and return 0. */
609 if (newest_binlog->reserved >= t) {
610 newest_binlog->reserved -= t;
611 } else {
612 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
614 return 0;
616 move_reserved(remainder, b, newest_binlog);
617 return maintain_invariants_iter(b->next, 0, t);
621 /* Preserve some invariants immediately after any space reservation
622 * (where z is the size of a delete record in the binlog).
623 * Invariant 1: current_binlog->reserved >= n.
624 * Invariant 2: current_binlog->reserved is congruent to n (mod z).
625 * Invariant 3: all future binlogs ->reserved is congruent to 0 (mod z). */
626 static size_t
627 maintain_invariants(size_t n)
629 size_t r;
631 /* In this function, reserved bytes are conserved (they are neither created
632 * nor destroyed). We just move them around to preserve the invariant. We
633 * might have to create new free space (i.e. allocate a new binlog file),
634 * though. */
636 /* Invariant 1. */
637 /* This is a loop, but it's guaranteed to run at most once. The proof is
638 * left as an exercise for the reader. */
639 while (current_binlog->reserved < n) {
640 size_t to_move = current_binlog->reserved;
642 r = ensure_free_space(to_move);
643 if (r != to_move) {
644 twarnx("ensure_free_space");
645 /* We have failed, so undo the reservation and return 0. */
646 if (newest_binlog->reserved >= n) {
647 newest_binlog->reserved -= n;
648 } else {
649 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
651 return 0;
654 move_reserved(to_move, current_binlog, newest_binlog);
655 binlog_use_next();
658 /* Invariants 2 and 3. */
659 return maintain_invariants_iter(current_binlog, n, n);
662 /* Returns the number of bytes successfully reserved: either 0 or n. */
663 static size_t
664 binlog_reserve_space(size_t n)
666 size_t r;
668 /* This return value must be nonzero but is otherwise ignored. */
669 if (!current_binlog) return 1;
671 if (current_binlog->free >= n) {
672 current_binlog->free -= n;
673 current_binlog->reserved += n;
674 return maintain_invariants(n);
677 r = ensure_free_space(n);
678 if (r != n) return twarnx("ensure_free_space"), 0;
680 newest_binlog->free -= n;
681 newest_binlog->reserved += n;
682 return maintain_invariants(n);
685 /* Returns the number of bytes reserved. */
686 size_t
687 binlog_reserve_space_put(job j)
689 size_t z = 0;
691 /* reserve space for the initial job record */
692 z += sizeof(size_t);
693 z += strlen(j->tube->name);
694 z += job_record_size;
695 z += j->body_size;
697 /* plus space for a delete to come later */
698 z += sizeof(size_t);
699 z += job_record_size;
701 return binlog_reserve_space(z);
704 size_t
705 binlog_reserve_space_update(job j)
707 size_t z = 0;
709 z += sizeof(size_t);
710 z += job_record_size;
711 return binlog_reserve_space(z);
715 binlog_lock()
717 int r;
718 struct flock lock;
719 char path[PATH_MAX];
721 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
722 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
724 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
725 if (lock_fd == -1) return twarn("open"), 0;
727 lock.l_type = F_WRLCK;
728 lock.l_whence = SEEK_SET;
729 lock.l_start = 0;
730 lock.l_len = 0;
731 r = fcntl(lock_fd, F_SETLK, &lock);
732 if (r) return twarn("fcntl"), 0;
734 return 1;
737 void
738 binlog_init(job binlog_jobs)
740 int binlog_index_min;
741 struct stat sbuf;
742 int fd, idx, r;
743 size_t n;
744 char path[PATH_MAX];
745 binlog b;
747 if (!binlog_dir) return;
749 /* Recover any jobs in old binlogs */
751 if (stat(binlog_dir, &sbuf) < 0) {
752 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
753 } else if (!(sbuf.st_mode & S_IFDIR)) {
754 twarnx("%s", binlog_dir);
755 return;
758 binlog_index_min = binlog_scan_dir();
760 if (binlog_index_min) {
761 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
762 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
763 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
765 fd = open(path, O_RDONLY);
767 if (fd < 0) {
768 twarn("%s", path);
769 } else {
770 b = binlog_iref(add_binlog(make_binlog(path)));
771 b->fd = fd;
772 binlog_read_log_file(b, binlog_jobs);
773 close(fd);
774 b->fd = -1;
775 binlog_dref(b);
782 /* Set up for writing out new jobs */
783 n = ensure_free_space(1);
784 if (!n) return twarnx("error making first writable binlog");
786 current_binlog = newest_binlog;
789 const char *
790 binlog_oldest_index()
792 if (!oldest_binlog) return "0";
794 return strrchr(oldest_binlog->path, '.') + 1;
797 const char *
798 binlog_current_index()
800 if (!newest_binlog) return "0";
802 return strrchr(newest_binlog->path, '.') + 1;