Use less disk space (and update tests).
[beanstalkd.git] / binlog.c
blob0dd3a3c1a0d992bc3c43813d04af1ce4e8bd0fea
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
31 #include <limits.h>
32 #include <stddef.h>
34 #include "tube.h"
35 #include "job.h"
36 #include "binlog.h"
37 #include "util.h"
38 #include "config.h"
40 typedef struct binlog *binlog;
42 struct binlog {
43 binlog next;
44 unsigned int refs;
45 int fd;
46 size_t free;
47 size_t reserved;
48 char path[];
51 /* max size we will create a log file */
52 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
54 char *binlog_dir = NULL;
55 static int binlog_index = 0;
56 static int binlog_version = 3;
57 static int lock_fd;
59 static binlog oldest_binlog = 0,
60 current_binlog = 0,
61 newest_binlog = 0;
63 static const size_t job_record_size = offsetof(struct job, pad);
65 static int
66 binlog_scan_dir()
68 DIR *dirp;
69 struct dirent *dp;
70 long min = 0;
71 long max = 0;
72 long val;
73 char *endptr;
74 size_t name_len;
76 dirp = opendir(binlog_dir);
77 if (!dirp) return 0;
79 while ((dp = readdir(dirp)) != NULL) {
80 name_len = strlen(dp->d_name);
81 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
82 val = strtol(dp->d_name + 7, &endptr, 10);
83 if (endptr && *endptr == 0) {
84 if (max == 0 || val > max) max = val;
85 if (min == 0 || val < min) min = val;
90 closedir(dirp);
91 binlog_index = (int) max;
92 return (int) min;
95 static void
96 binlog_remove_oldest()
98 binlog b = oldest_binlog;
100 if (!b) return;
102 oldest_binlog = b->next;
104 unlink(b->path);
105 free(b);
108 static binlog
109 binlog_iref(binlog b)
111 if (b) b->refs++;
112 return b;
115 static void
116 binlog_dref(binlog b)
118 if (!b) return;
119 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
121 --b->refs;
122 if (b->refs < 1) {
123 while (oldest_binlog && oldest_binlog->refs == 0) {
124 binlog_remove_oldest();
130 static void
131 binlog_warn(int fd, const char* path, const char *msg)
133 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
134 " Continuing. You may be missing data.");
138 #define binlog_warn(b, fmt, args...) \
139 warnx("WARNING, " fmt " at %s:%u. %s: ", \
140 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
141 "Continuing. You may be missing data.")
143 static void
144 binlog_read_log_file(binlog b, job binlog_jobs)
146 struct job js;
147 tube t;
148 job j;
149 char tubename[MAX_TUBE_NAME_LEN];
150 size_t namelen;
151 ssize_t r;
152 int version;
154 r = read(b->fd, &version, sizeof(version));
155 if (r == -1) return twarn("read()");
156 if (r < sizeof(version)) {
157 return binlog_warn(b, "EOF while reading version record");
160 if (version != binlog_version) {
161 return warnx("%s: binlog version mismatch %d %d", b->path, version,
162 binlog_version);
165 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
166 if (namelen > 0) {
167 r = read(b->fd, tubename, namelen);
168 if (r == -1) return twarn("read()");
169 if (r < namelen) {
170 lseek(b->fd, SEEK_CUR, 0);
171 return binlog_warn(b, "EOF while reading tube name");
175 tubename[namelen] = '\0';
176 r = read(b->fd, &js, job_record_size);
177 if (r == -1) return twarn("read()");
178 if (r < job_record_size) {
179 return binlog_warn(b, "EOF while reading job record");
182 if (!js.id) break;
184 j = job_find(js.id);
185 switch (js.state) {
186 case JOB_STATE_INVALID:
187 if (j) {
188 job_remove(j);
189 binlog_dref(j->binlog);
190 job_free(j);
191 j = NULL;
193 break;
194 case JOB_STATE_READY:
195 case JOB_STATE_DELAYED:
196 if (!j && namelen > 0) {
197 t = tube_find_or_make(tubename);
198 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
199 t, js.id);
200 j->next = j->prev = j;
201 j->creation = js.creation;
202 job_insert(binlog_jobs, j);
204 if (js.body_size) {
205 if (js.body_size > j->body_size) {
206 warnx("job size increased from %zu to %zu", j->body_size,
207 js.body_size);
208 job_remove(j);
209 binlog_dref(j->binlog);
210 job_free(j);
211 return binlog_warn(b, "EOF while reading job body");
213 r = read(b->fd, j->body, js.body_size);
214 if (r == -1) return twarn("read()");
215 if (r < js.body_size) {
216 warnx("dropping incomplete job %llu", j->id);
217 job_remove(j);
218 binlog_dref(j->binlog);
219 job_free(j);
220 return binlog_warn(b, "EOF while reading job body");
223 break;
225 if (j) {
226 j->state = js.state;
227 j->deadline = js.deadline;
228 j->pri = js.pri;
229 j->delay = js.delay;
230 j->ttr = js.ttr;
231 j->timeout_ct = js.timeout_ct;
232 j->release_ct = js.release_ct;
233 j->bury_ct = js.bury_ct;
234 j->kick_ct = js.kick_ct;
236 /* this is a complete record, so we can move the binlog ref */
237 if (namelen && js.body_size) {
238 binlog_dref(j->binlog);
239 j->binlog = binlog_iref(b);
245 static void
246 binlog_close(binlog b)
248 if (!b) return;
249 if (b->fd < 0) return;
250 close(b->fd);
251 b->fd = -1;
252 binlog_dref(b);
255 static binlog
256 make_binlog(char *path)
258 binlog b;
260 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
261 if (!b) return twarnx("OOM"), (binlog) 0;
262 strcpy(b->path, path);
263 b->refs = 0;
264 b->next = NULL;
265 b->fd = -1;
266 b->free = 0;
267 b->reserved = 0;
268 return b;
271 static binlog
272 make_next_binlog()
274 int r;
275 char path[PATH_MAX];
277 if (!binlog_dir) return NULL;
279 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
280 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
282 return make_binlog(path);
285 static binlog
286 add_binlog(binlog b)
288 if (newest_binlog) newest_binlog->next = b;
289 newest_binlog = b;
290 if (!oldest_binlog) oldest_binlog = b;
292 return b;
295 static void
296 binlog_open(binlog log, size_t *written)
298 int fd;
299 size_t bytes_written;
301 if (written) *written = 0;
303 if (!binlog_iref(log)) return;
305 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
307 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
309 #ifdef HAVE_POSIX_FALLOCATE
311 int r;
312 r = posix_fallocate(fd, 0, binlog_size_limit);
313 if (r) {
314 close(fd);
315 binlog_dref(log);
316 errno = r;
317 return twarn("Cannot allocate space for binlog %s", log->path);
320 #else
321 /* Allocate space in a slow but portable way. */
323 size_t i;
324 ssize_t w;
325 off_t p;
326 #define ZERO_BUF_SIZE 512
327 char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
329 for (i = 0; i < binlog_size_limit; i += w) {
330 w = write(fd, &buf, ZERO_BUF_SIZE);
331 if (w == -1) {
332 twarn("Cannot allocate space for binlog %s", log->path);
333 close(fd);
334 binlog_dref(log);
335 return;
339 p = lseek(fd, 0, SEEK_SET);
340 if (p == -1) {
341 twarn("lseek");
342 close(fd);
343 binlog_dref(log);
344 return;
347 #endif
349 bytes_written = write(fd, &binlog_version, sizeof(int));
350 if (written) *written = bytes_written;
352 if (bytes_written < sizeof(int)) {
353 twarn("Cannot write to binlog");
354 close(fd);
355 binlog_dref(log);
356 return;
359 log->fd = fd;
362 /* returns 1 on success, 0 on error. */
363 static int
364 binlog_use_next()
366 binlog next;
368 if (!current_binlog) return 0;
370 next = current_binlog->next;
372 if (!next) return 0;
374 /* assert(current_binlog->reserved == 0); */
376 binlog_close(current_binlog);
377 current_binlog = next;
379 return 1;
382 void
383 binlog_shutdown()
385 binlog_use_next();
386 binlog_close(current_binlog);
389 /* Returns the number of jobs successfully written (either 0 or 1).
391 If this fails, something is seriously wrong. It should never fail because of
392 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
393 fail because of a full disk.)
395 If we are not using the binlog at all (!current_binlog), then we pretend to
396 have made a successful write and return 1. */
398 binlog_write_job(job j)
400 ssize_t written;
401 size_t tube_namelen, to_write = 0;
402 struct iovec vec[4], *vptr;
403 int vcnt = 3, r;
405 if (!current_binlog) return 1;
406 tube_namelen = 0;
408 vec[0].iov_base = (char *) &tube_namelen;
409 to_write += vec[0].iov_len = sizeof(size_t);
411 vec[1].iov_base = j->tube->name;
412 vec[1].iov_len = 0;
414 vec[2].iov_base = (char *) j;
415 to_write += vec[2].iov_len = job_record_size;
417 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
418 if (!j->binlog) {
419 tube_namelen = strlen(j->tube->name);
420 to_write += vec[1].iov_len = tube_namelen;
421 vcnt = 4;
422 vec[3].iov_base = j->body;
423 to_write += vec[3].iov_len = j->body_size;
425 } else if (j->state == JOB_STATE_INVALID) {
426 if (j->binlog) binlog_dref(j->binlog);
427 j->binlog = NULL;
428 } else {
429 return twarnx("unserializable job state: %d", j->state), 0;
432 if (to_write > current_binlog->reserved) {
433 r = binlog_use_next();
434 if (!r) return twarnx("failed to use next binlog"), 0;
437 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
439 while (to_write > 0) {
440 written = writev(current_binlog->fd, vec, vcnt);
442 if (written < 0) {
443 if (errno == EAGAIN) continue;
444 if (errno == EINTR) continue;
446 twarn("writev");
447 binlog_close(current_binlog);
448 current_binlog = 0;
449 return 0;
452 to_write -= written;
453 if (to_write > 0 && written > 0) {
454 for (vptr = vec; written >= vptr->iov_len; vptr++) {
455 written -= vptr->iov_len;
456 vptr->iov_len = 0;
458 vptr->iov_base = (char *) vptr->iov_base + written;
459 vptr->iov_len -= written;
461 current_binlog->reserved -= written;
462 j->reserved_binlog_space -= written;
465 return 1;
468 static binlog
469 make_future_binlog()
471 binlog b;
472 size_t header;
474 /* open a new binlog with more space to reserve */
475 b = make_next_binlog();
476 if (!b) return twarnx("error making next binlog"), (binlog) 0;
477 binlog_open(b, &header);
479 /* open failed, so we can't reserve any space */
480 if (b->fd < 0) {
481 free(b);
482 return 0;
485 b->free = binlog_size_limit - header;
486 b->reserved = 0;
487 return b;
490 static int
491 can_move_reserved(size_t n, binlog from, binlog to)
493 return from->reserved >= n && to->free >= n;
496 static void
497 move_reserved(size_t n, binlog from, binlog to)
499 from->reserved -= n;
500 from->free += n;
501 to->reserved += n;
502 to->free -= n;
505 static size_t
506 ensure_free_space(size_t n)
508 binlog fb;
510 if (newest_binlog && newest_binlog->free >= n) return n;
512 /* open a new binlog */
513 fb = make_future_binlog();
514 if (!fb) return twarnx("make_future_binlog"), 0;
516 add_binlog(fb);
517 return n;
520 /* Preserve some invariants immediately after any space reservation.
521 * Invariant 1: current_binlog->reserved >= n.
522 * Invariant 2: current_binlog->reserved is congruent to n (mod z), where z
523 * is the size of a delete record in the binlog. */
524 static size_t
525 maintain_invariant(size_t n)
527 size_t reserved_later, remainder, complement, z, r;
529 /* In this function, reserved bytes are conserved (they are neither created
530 * nor destroyed). We just move them around to preserve the invariant. We
531 * might have to create new free space (i.e. allocate a new binlog file),
532 * though. */
534 /* Invariant 1. */
535 /* This is a loop, but it's guaranteed to run at most once. The proof is
536 * left as an exercise for the reader. */
537 while (current_binlog->reserved < n) {
538 size_t to_move = current_binlog->reserved;
540 r = ensure_free_space(to_move);
541 if (r != to_move) {
542 twarnx("ensure_free_space");
543 if (newest_binlog->reserved >= n) {
544 newest_binlog->reserved -= n;
545 } else {
546 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
548 return 0;
551 move_reserved(to_move, current_binlog, newest_binlog);
552 binlog_use_next();
556 /* Invariant 2. */
558 z = sizeof(size_t) + job_record_size;
559 reserved_later = current_binlog->reserved - n;
560 remainder = reserved_later % z;
561 if (remainder == 0) return n;
562 complement = z - remainder;
563 if (can_move_reserved(complement, newest_binlog, current_binlog)) {
564 move_reserved(complement, newest_binlog, current_binlog);
565 return n;
568 r = ensure_free_space(remainder);
569 if (r != remainder) {
570 twarnx("ensure_free_space");
571 if (newest_binlog->reserved >= n) {
572 newest_binlog->reserved -= n;
573 } else {
574 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
576 return 0;
578 move_reserved(remainder, current_binlog, newest_binlog);
580 return n;
583 /* Returns the number of bytes successfully reserved: either 0 or n. */
584 static size_t
585 binlog_reserve_space(size_t n)
587 size_t r;
589 /* This return value must be nonzero but is otherwise ignored. */
590 if (!current_binlog) return 1;
592 if (current_binlog->free >= n) {
593 current_binlog->free -= n;
594 current_binlog->reserved += n;
595 return maintain_invariant(n);
598 r = ensure_free_space(n);
599 if (r != n) return twarnx("ensure_free_space"), 0;
601 newest_binlog->free -= n;
602 newest_binlog->reserved += n;
603 return maintain_invariant(n);
606 /* Returns the number of bytes reserved. */
607 size_t
608 binlog_reserve_space_put(job j)
610 size_t z = 0;
612 /* reserve space for the initial job record */
613 z += sizeof(size_t);
614 z += strlen(j->tube->name);
615 z += job_record_size;
616 z += j->body_size;
618 /* plus space for a delete to come later */
619 z += sizeof(size_t);
620 z += job_record_size;
622 return binlog_reserve_space(z);
625 size_t
626 binlog_reserve_space_update(job j)
628 size_t z = 0;
630 z += sizeof(size_t);
631 z += job_record_size;
632 return binlog_reserve_space(z);
636 binlog_lock()
638 int r;
639 struct flock lock;
640 char path[PATH_MAX];
642 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
643 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
645 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
646 if (lock_fd == -1) return twarn("open"), 0;
648 lock.l_type = F_WRLCK;
649 lock.l_whence = SEEK_SET;
650 lock.l_start = 0;
651 lock.l_len = 0;
652 r = fcntl(lock_fd, F_SETLK, &lock);
653 if (r) return twarn("fcntl"), 0;
655 return 1;
658 void
659 binlog_init(job binlog_jobs)
661 int binlog_index_min;
662 struct stat sbuf;
663 int fd, idx, r;
664 size_t n;
665 char path[PATH_MAX];
666 binlog b;
668 if (!binlog_dir) return;
670 /* Recover any jobs in old binlogs */
672 if (stat(binlog_dir, &sbuf) < 0) {
673 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
674 } else if (!(sbuf.st_mode & S_IFDIR)) {
675 twarnx("%s", binlog_dir);
676 return;
679 binlog_index_min = binlog_scan_dir();
681 if (binlog_index_min) {
682 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
683 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
684 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
686 fd = open(path, O_RDONLY);
688 if (fd < 0) {
689 twarn("%s", path);
690 } else {
691 b = binlog_iref(add_binlog(make_binlog(path)));
692 b->fd = fd;
693 binlog_read_log_file(b, binlog_jobs);
694 close(fd);
695 b->fd = -1;
696 binlog_dref(b);
703 /* Set up for writing out new jobs */
704 n = ensure_free_space(1);
705 if (!n) return twarnx("error making first writable binlog");
707 current_binlog = newest_binlog;
710 const char *
711 binlog_oldest_index()
713 if (!oldest_binlog) return "0";
715 return strrchr(oldest_binlog->path, '.') + 1;
718 const char *
719 binlog_current_index()
721 if (!newest_binlog) return "0";
723 return strrchr(newest_binlog->path, '.') + 1;