Fixes for make distcheck.
[beanstalkd.git] / binlog.c
blob43e51a0eb0fcd8dff8d1b84655451a0414ba6c68
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "config.h"
21 #if HAVE_STDINT_H
22 # include <stdint.h>
23 #endif /* else we get int types from config.h */
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <fcntl.h>
28 #include <unistd.h>
29 #include <string.h>
30 #include <errno.h>
31 #include <dirent.h>
32 #include <sys/resource.h>
33 #include <sys/param.h>
34 #include <sys/uio.h>
35 #include <sys/stat.h>
36 #include <stdarg.h>
37 #include <limits.h>
38 #include <stddef.h>
40 #include "tube.h"
41 #include "job.h"
42 #include "binlog.h"
43 #include "util.h"
45 typedef struct binlog *binlog;
47 struct binlog {
48 binlog next;
49 unsigned int refs;
50 int fd;
51 size_t free;
52 size_t reserved;
53 char path[];
56 /* max size we will create a log file */
57 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
59 int enable_fsync = 0;
60 size_t fsync_throttle_ms = 0;
61 uint64_t last_fsync = 0;
63 char *binlog_dir = NULL;
64 static int binlog_index = 0;
65 static int binlog_version = 5;
66 static int lock_fd;
68 static binlog oldest_binlog = 0,
69 current_binlog = 0,
70 newest_binlog = 0;
72 static const size_t job_record_size = offsetof(struct job, pad);
74 static int
75 binlog_scan_dir()
77 DIR *dirp;
78 struct dirent *dp;
79 long min = 0;
80 long max = 0;
81 long val;
82 char *endptr;
83 size_t name_len;
85 dirp = opendir(binlog_dir);
86 if (!dirp) return 0;
88 while ((dp = readdir(dirp)) != NULL) {
89 name_len = strlen(dp->d_name);
90 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
91 val = strtol(dp->d_name + 7, &endptr, 10);
92 if (endptr && *endptr == 0) {
93 if (max == 0 || val > max) max = val;
94 if (min == 0 || val < min) min = val;
99 closedir(dirp);
100 binlog_index = (int) max;
101 return (int) min;
104 static void
105 binlog_remove_oldest()
107 binlog b = oldest_binlog;
109 if (!b) return;
111 oldest_binlog = b->next;
113 unlink(b->path);
114 free(b);
117 static binlog
118 binlog_iref(binlog b)
120 if (b) b->refs++;
121 return b;
124 static void
125 binlog_dref(binlog b)
127 if (!b) return;
128 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
130 --b->refs;
131 if (b->refs < 1) {
132 while (oldest_binlog && oldest_binlog->refs == 0) {
133 binlog_remove_oldest();
139 static void
140 binlog_warn(int fd, const char* path, const char *msg)
142 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
143 " Continuing. You may be missing data.");
147 #define binlog_warn(b, fmt, args...) \
148 warnx("WARNING, " fmt " at %s:%u. %s: ", \
149 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
150 "Continuing. You may be missing data.")
152 static void
153 binlog_read_log_file(binlog b, job binlog_jobs)
155 struct job js;
156 tube t;
157 job j;
158 char tubename[MAX_TUBE_NAME_LEN];
159 size_t namelen;
160 ssize_t r;
161 int version;
163 r = read(b->fd, &version, sizeof(version));
164 if (r == -1) return twarn("read()");
165 if (r < sizeof(version)) {
166 return binlog_warn(b, "EOF while reading version record");
169 if (version != binlog_version) {
170 return warnx("%s: binlog version mismatch %d %d", b->path, version,
171 binlog_version);
174 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
175 if (namelen > 0) {
176 r = read(b->fd, tubename, namelen);
177 if (r == -1) return twarn("read()");
178 if (r < namelen) {
179 lseek(b->fd, SEEK_CUR, 0);
180 return binlog_warn(b, "EOF while reading tube name");
184 tubename[namelen] = '\0';
185 r = read(b->fd, &js, job_record_size);
186 if (r == -1) return twarn("read()");
187 if (r < job_record_size) {
188 return binlog_warn(b, "EOF while reading job record");
191 if (!js.id) break;
193 j = job_find(js.id);
194 switch (js.state) {
195 case JOB_STATE_INVALID:
196 if (j) {
197 job_remove(j);
198 binlog_dref(j->binlog);
199 job_free(j);
200 j = NULL;
202 break;
203 case JOB_STATE_READY:
204 case JOB_STATE_DELAYED:
205 if (!j && namelen > 0) {
206 t = tube_find_or_make(tubename);
207 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
208 t, js.id);
209 j->next = j->prev = j;
210 j->created_at = js.created_at;
211 job_insert(binlog_jobs, j);
213 if (js.body_size) {
214 if (js.body_size > j->body_size) {
215 warnx("job size increased from %zu to %zu", j->body_size,
216 js.body_size);
217 job_remove(j);
218 binlog_dref(j->binlog);
219 job_free(j);
220 return binlog_warn(b, "EOF while reading job body");
222 r = read(b->fd, j->body, js.body_size);
223 if (r == -1) return twarn("read()");
224 if (r < js.body_size) {
225 warnx("dropping incomplete job %llu", j->id);
226 job_remove(j);
227 binlog_dref(j->binlog);
228 job_free(j);
229 return binlog_warn(b, "EOF while reading job body");
232 break;
234 if (j) {
235 j->state = js.state;
236 j->deadline_at = js.deadline_at;
237 j->pri = js.pri;
238 j->delay = js.delay;
239 j->ttr = js.ttr;
240 j->timeout_ct = js.timeout_ct;
241 j->release_ct = js.release_ct;
242 j->bury_ct = js.bury_ct;
243 j->kick_ct = js.kick_ct;
245 /* this is a complete record, so we can move the binlog ref */
246 if (namelen && js.body_size) {
247 binlog_dref(j->binlog);
248 j->binlog = binlog_iref(b);
254 static void
255 binlog_close(binlog b)
257 if (!b) return;
258 if (b->fd < 0) return;
259 close(b->fd);
260 b->fd = -1;
261 binlog_dref(b);
264 static binlog
265 make_binlog(char *path)
267 binlog b;
269 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
270 if (!b) return twarnx("OOM"), (binlog) 0;
271 strcpy(b->path, path);
272 b->refs = 0;
273 b->next = NULL;
274 b->fd = -1;
275 b->free = 0;
276 b->reserved = 0;
277 return b;
280 static binlog
281 make_next_binlog()
283 int r;
284 char path[PATH_MAX];
286 if (!binlog_dir) return NULL;
288 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
289 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
291 return make_binlog(path);
294 static binlog
295 add_binlog(binlog b)
297 if (newest_binlog) newest_binlog->next = b;
298 newest_binlog = b;
299 if (!oldest_binlog) oldest_binlog = b;
301 return b;
304 static void
305 binlog_open(binlog log, size_t *written)
307 int fd;
308 size_t bytes_written;
310 if (written) *written = 0;
312 if (!binlog_iref(log)) return;
314 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
316 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
318 #ifdef HAVE_POSIX_FALLOCATE
320 int r;
321 r = posix_fallocate(fd, 0, binlog_size_limit);
322 if (r) {
323 close(fd);
324 binlog_dref(log);
325 errno = r;
326 return twarn("Cannot allocate space for binlog %s", log->path);
329 #else
330 /* Allocate space in a slow but portable way. */
332 size_t i;
333 ssize_t w;
334 off_t p;
335 #define ZERO_BUF_SIZE 512
336 char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
338 for (i = 0; i < binlog_size_limit; i += w) {
339 w = write(fd, &buf, ZERO_BUF_SIZE);
340 if (w == -1) {
341 twarn("Cannot allocate space for binlog %s", log->path);
342 close(fd);
343 binlog_dref(log);
344 return;
348 p = lseek(fd, 0, SEEK_SET);
349 if (p == -1) {
350 twarn("lseek");
351 close(fd);
352 binlog_dref(log);
353 return;
356 #endif
358 bytes_written = write(fd, &binlog_version, sizeof(int));
359 if (written) *written = bytes_written;
361 if (bytes_written < sizeof(int)) {
362 twarn("Cannot write to binlog");
363 close(fd);
364 binlog_dref(log);
365 return;
368 log->fd = fd;
371 /* returns 1 on success, 0 on error. */
372 static int
373 binlog_use_next()
375 binlog next;
377 if (!current_binlog) return 0;
379 next = current_binlog->next;
381 if (!next) return 0;
383 /* assert(current_binlog->reserved == 0); */
385 binlog_close(current_binlog);
386 current_binlog = next;
388 return 1;
391 void
392 binlog_shutdown()
394 binlog_use_next();
395 binlog_close(current_binlog);
398 /* Returns the number of jobs successfully written (either 0 or 1).
400 If this fails, something is seriously wrong. It should never fail because of
401 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
402 fail because of a full disk.)
404 If we are not using the binlog at all (!current_binlog), then we pretend to
405 have made a successful write and return 1. */
407 binlog_write_job(job j)
409 ssize_t written;
410 size_t tube_namelen, to_write = 0;
411 struct iovec vec[4], *vptr;
412 int vcnt = 3, r;
413 uint64_t now;
415 if (!current_binlog) return 1;
416 tube_namelen = 0;
418 vec[0].iov_base = (char *) &tube_namelen;
419 to_write += vec[0].iov_len = sizeof(size_t);
421 vec[1].iov_base = j->tube->name;
422 vec[1].iov_len = 0;
424 vec[2].iov_base = (char *) j;
425 to_write += vec[2].iov_len = job_record_size;
427 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
428 if (!j->binlog) {
429 tube_namelen = strlen(j->tube->name);
430 to_write += vec[1].iov_len = tube_namelen;
431 vcnt = 4;
432 vec[3].iov_base = j->body;
433 to_write += vec[3].iov_len = j->body_size;
435 } else if (j->state == JOB_STATE_INVALID) {
436 if (j->binlog) binlog_dref(j->binlog);
437 j->binlog = NULL;
438 } else {
439 return twarnx("unserializable job state: %d", j->state), 0;
442 if (to_write > current_binlog->reserved) {
443 r = binlog_use_next();
444 if (!r) return twarnx("failed to use next binlog"), 0;
447 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
449 while (to_write > 0) {
450 written = writev(current_binlog->fd, vec, vcnt);
452 if (written < 0) {
453 if (errno == EAGAIN) continue;
454 if (errno == EINTR) continue;
456 twarn("writev");
457 binlog_close(current_binlog);
458 current_binlog = 0;
459 return 0;
462 to_write -= written;
463 if (to_write > 0 && written > 0) {
464 for (vptr = vec; written >= vptr->iov_len; vptr++) {
465 written -= vptr->iov_len;
466 vptr->iov_len = 0;
468 vptr->iov_base = (char *) vptr->iov_base + written;
469 vptr->iov_len -= written;
471 current_binlog->reserved -= written;
472 j->reserved_binlog_space -= written;
475 now = now_usec() / 1000; /* usec -> msec */
476 if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
477 r = fdatasync(current_binlog->fd);
478 if (r == -1) return twarn("fdatasync"), 0;
479 last_fsync = now;
482 return 1;
485 static binlog
486 make_future_binlog()
488 binlog b;
489 size_t header;
491 /* open a new binlog with more space to reserve */
492 b = make_next_binlog();
493 if (!b) return twarnx("error making next binlog"), (binlog) 0;
494 binlog_open(b, &header);
496 /* open failed, so we can't reserve any space */
497 if (b->fd < 0) {
498 free(b);
499 return 0;
502 b->free = binlog_size_limit - header;
503 b->reserved = 0;
504 return b;
507 static int
508 can_move_reserved(size_t n, binlog from, binlog to)
510 return from->reserved >= n && to->free >= n;
513 static void
514 move_reserved(size_t n, binlog from, binlog to)
516 from->reserved -= n;
517 from->free += n;
518 to->reserved += n;
519 to->free -= n;
522 static size_t
523 ensure_free_space(size_t n)
525 binlog fb;
527 if (newest_binlog && newest_binlog->free >= n) return n;
529 /* open a new binlog */
530 fb = make_future_binlog();
531 if (!fb) return twarnx("make_future_binlog"), 0;
533 add_binlog(fb);
534 return n;
537 /* Preserve some invariants immediately after any space reservation.
538 * Invariant 1: current_binlog->reserved >= n.
539 * Invariant 2: current_binlog->reserved is congruent to n (mod z), where z
540 * is the size of a delete record in the binlog. */
541 static size_t
542 maintain_invariant(size_t n)
544 size_t reserved_later, remainder, complement, z, r;
546 /* In this function, reserved bytes are conserved (they are neither created
547 * nor destroyed). We just move them around to preserve the invariant. We
548 * might have to create new free space (i.e. allocate a new binlog file),
549 * though. */
551 /* Invariant 1. */
552 /* This is a loop, but it's guaranteed to run at most once. The proof is
553 * left as an exercise for the reader. */
554 while (current_binlog->reserved < n) {
555 size_t to_move = current_binlog->reserved;
557 r = ensure_free_space(to_move);
558 if (r != to_move) {
559 twarnx("ensure_free_space");
560 if (newest_binlog->reserved >= n) {
561 newest_binlog->reserved -= n;
562 } else {
563 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
565 return 0;
568 move_reserved(to_move, current_binlog, newest_binlog);
569 binlog_use_next();
573 /* Invariant 2. */
575 z = sizeof(size_t) + job_record_size;
576 reserved_later = current_binlog->reserved - n;
577 remainder = reserved_later % z;
578 if (remainder == 0) return n;
579 complement = z - remainder;
580 if (can_move_reserved(complement, newest_binlog, current_binlog)) {
581 move_reserved(complement, newest_binlog, current_binlog);
582 return n;
585 r = ensure_free_space(remainder);
586 if (r != remainder) {
587 twarnx("ensure_free_space");
588 if (newest_binlog->reserved >= n) {
589 newest_binlog->reserved -= n;
590 } else {
591 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
593 return 0;
595 move_reserved(remainder, current_binlog, newest_binlog);
597 return n;
600 /* Returns the number of bytes successfully reserved: either 0 or n. */
601 static size_t
602 binlog_reserve_space(size_t n)
604 size_t r;
606 /* This return value must be nonzero but is otherwise ignored. */
607 if (!current_binlog) return 1;
609 if (current_binlog->free >= n) {
610 current_binlog->free -= n;
611 current_binlog->reserved += n;
612 return maintain_invariant(n);
615 r = ensure_free_space(n);
616 if (r != n) return twarnx("ensure_free_space"), 0;
618 newest_binlog->free -= n;
619 newest_binlog->reserved += n;
620 return maintain_invariant(n);
623 /* Returns the number of bytes reserved. */
624 size_t
625 binlog_reserve_space_put(job j)
627 size_t z = 0;
629 /* reserve space for the initial job record */
630 z += sizeof(size_t);
631 z += strlen(j->tube->name);
632 z += job_record_size;
633 z += j->body_size;
635 /* plus space for a delete to come later */
636 z += sizeof(size_t);
637 z += job_record_size;
639 return binlog_reserve_space(z);
642 size_t
643 binlog_reserve_space_update(job j)
645 size_t z = 0;
647 z += sizeof(size_t);
648 z += job_record_size;
649 return binlog_reserve_space(z);
653 binlog_lock()
655 int r;
656 struct flock lock;
657 char path[PATH_MAX];
659 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
660 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
662 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
663 if (lock_fd == -1) return twarn("open"), 0;
665 lock.l_type = F_WRLCK;
666 lock.l_whence = SEEK_SET;
667 lock.l_start = 0;
668 lock.l_len = 0;
669 r = fcntl(lock_fd, F_SETLK, &lock);
670 if (r) return twarn("fcntl"), 0;
672 return 1;
675 void
676 binlog_init(job binlog_jobs)
678 int binlog_index_min;
679 struct stat sbuf;
680 int fd, idx, r;
681 size_t n;
682 char path[PATH_MAX];
683 binlog b;
685 if (!binlog_dir) return;
687 /* Recover any jobs in old binlogs */
689 if (stat(binlog_dir, &sbuf) < 0) {
690 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
691 } else if (!(sbuf.st_mode & S_IFDIR)) {
692 twarnx("%s", binlog_dir);
693 return;
696 binlog_index_min = binlog_scan_dir();
698 if (binlog_index_min) {
699 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
700 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
701 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
703 fd = open(path, O_RDONLY);
705 if (fd < 0) {
706 twarn("%s", path);
707 } else {
708 b = binlog_iref(add_binlog(make_binlog(path)));
709 b->fd = fd;
710 binlog_read_log_file(b, binlog_jobs);
711 close(fd);
712 b->fd = -1;
713 binlog_dref(b);
720 /* Set up for writing out new jobs */
721 n = ensure_free_space(1);
722 if (!n) return twarnx("error making first writable binlog");
724 current_binlog = newest_binlog;
727 const char *
728 binlog_oldest_index()
730 if (!oldest_binlog) return "0";
732 return strrchr(oldest_binlog->path, '.') + 1;
735 const char *
736 binlog_current_index()
738 if (!newest_binlog) return "0";
740 return strrchr(newest_binlog->path, '.') + 1;