Add a disk full test; fix the bug it exposed.
[beanstalkd.git] / binlog.c
bloba3941de8e826267375a9ba3c449d325880365edb
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
31 #include <limits.h>
33 #include "tube.h"
34 #include "job.h"
35 #include "binlog.h"
36 #include "util.h"
37 #include "config.h"
39 typedef struct binlog *binlog;
41 struct binlog {
42 binlog next;
43 unsigned int refs;
44 int fd;
45 size_t free;
46 size_t reserved;
47 char path[];
50 /* max size we will create a log file */
51 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
53 char *binlog_dir = NULL;
54 static int binlog_index = 0;
55 static int binlog_version = 2;
56 static int lock_fd;
58 static binlog oldest_binlog = 0,
59 current_binlog = 0,
60 newest_binlog = 0;
62 static int
63 binlog_scan_dir()
65 DIR *dirp;
66 struct dirent *dp;
67 long min = 0;
68 long max = 0;
69 long val;
70 char *endptr;
71 size_t name_len;
73 dirp = opendir(binlog_dir);
74 if (!dirp) return 0;
76 while ((dp = readdir(dirp)) != NULL) {
77 name_len = strlen(dp->d_name);
78 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
79 val = strtol(dp->d_name + 7, &endptr, 10);
80 if (endptr && *endptr == 0) {
81 if (max == 0 || val > max) max = val;
82 if (min == 0 || val < min) min = val;
87 closedir(dirp);
88 binlog_index = (int) max;
89 return (int) min;
92 static void
93 binlog_remove_oldest()
95 binlog b = oldest_binlog;
97 if (!b) return;
99 oldest_binlog = b->next;
101 unlink(b->path);
102 free(b);
105 static binlog
106 binlog_iref(binlog b)
108 if (b) b->refs++;
109 return b;
112 static void
113 binlog_dref(binlog b)
115 if (!b) return;
116 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
118 --b->refs;
119 if (b->refs < 1) {
120 while (oldest_binlog && oldest_binlog->refs == 0) {
121 binlog_remove_oldest();
127 static void
128 binlog_warn(int fd, const char* path, const char *msg)
130 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
131 " Continuing. You may be missing data.");
135 #define binlog_warn(b, fmt, args...) \
136 warnx("WARNING, " fmt " at %s:%u. %s: ", \
137 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
138 "Continuing. You may be missing data.")
140 static void
141 binlog_read_log_file(binlog b, job binlog_jobs)
143 struct job js;
144 tube t;
145 job j;
146 char tubename[MAX_TUBE_NAME_LEN];
147 size_t namelen;
148 ssize_t r;
149 int version;
151 r = read(b->fd, &version, sizeof(version));
152 if (r == -1) return twarn("read()");
153 if (r < sizeof(version)) {
154 return binlog_warn(b, "EOF while reading version record");
157 if (version != binlog_version) {
158 return warnx("%s: binlog version mismatch %d %d", b->path, version,
159 binlog_version);
162 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
163 if (namelen > 0) {
164 r = read(b->fd, tubename, namelen);
165 if (r == -1) return twarn("read()");
166 if (r < namelen) {
167 lseek(b->fd, SEEK_CUR, 0);
168 return binlog_warn(b, "EOF while reading tube name");
172 tubename[namelen] = '\0';
173 r = read(b->fd, &js, sizeof(struct job));
174 if (r == -1) return twarn("read()");
175 if (r < sizeof(struct job)) {
176 return binlog_warn(b, "EOF while reading job record");
179 if (!js.id) break;
181 j = job_find(js.id);
182 switch (js.state) {
183 case JOB_STATE_INVALID:
184 if (j) {
185 job_remove(j);
186 binlog_dref(j->binlog);
187 job_free(j);
188 j = NULL;
190 break;
191 case JOB_STATE_READY:
192 case JOB_STATE_DELAYED:
193 if (!j && namelen > 0) {
194 t = tube_find_or_make(tubename);
195 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
196 t, js.id);
197 j->next = j->prev = j;
198 j->creation = js.creation;
199 job_insert(binlog_jobs, j);
201 if (js.body_size) {
202 if (js.body_size > j->body_size) {
203 warnx("job size increased from %zu to %zu", j->body_size,
204 js.body_size);
205 job_remove(j);
206 binlog_dref(j->binlog);
207 job_free(j);
208 return binlog_warn(b, "EOF while reading job body");
210 r = read(b->fd, j->body, js.body_size);
211 if (r == -1) return twarn("read()");
212 if (r < js.body_size) {
213 warnx("dropping incomplete job %llu", j->id);
214 job_remove(j);
215 binlog_dref(j->binlog);
216 job_free(j);
217 return binlog_warn(b, "EOF while reading job body");
220 break;
222 if (j) {
223 j->state = js.state;
224 j->deadline = js.deadline;
225 j->pri = js.pri;
226 j->delay = js.delay;
227 j->ttr = js.ttr;
228 j->timeout_ct = js.timeout_ct;
229 j->release_ct = js.release_ct;
230 j->bury_ct = js.bury_ct;
231 j->kick_ct = js.kick_ct;
233 /* this is a complete record, so we can move the binlog ref */
234 if (namelen && js.body_size) {
235 binlog_dref(j->binlog);
236 j->binlog = binlog_iref(b);
242 static void
243 binlog_close(binlog b)
245 if (!b) return;
246 if (b->fd < 0) return;
247 close(b->fd);
248 b->fd = -1;
249 binlog_dref(b);
252 static binlog
253 make_binlog(char *path)
255 binlog b;
257 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
258 if (!b) return twarnx("OOM"), (binlog) 0;
259 strcpy(b->path, path);
260 b->refs = 0;
261 b->next = NULL;
262 b->fd = -1;
263 b->free = 0;
264 b->reserved = 0;
265 return b;
268 static binlog
269 make_next_binlog()
271 int r;
272 char path[PATH_MAX];
274 if (!binlog_dir) return NULL;
276 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
277 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
279 return make_binlog(path);
282 static binlog
283 add_binlog(binlog b)
285 if (newest_binlog) newest_binlog->next = b;
286 newest_binlog = b;
287 if (!oldest_binlog) oldest_binlog = b;
289 return b;
292 static void
293 binlog_open(binlog log, size_t *written)
295 int fd;
296 size_t bytes_written;
298 if (written) *written = 0;
300 if (!binlog_iref(log)) return;
302 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
304 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
306 #ifdef HAVE_POSIX_FALLOCATE
308 int r;
309 r = posix_fallocate(fd, 0, binlog_size_limit);
310 if (r) {
311 close(fd);
312 binlog_dref(log);
313 errno = r;
314 return twarn("Cannot allocate space for binlog %s", log->path);
317 #else
318 /* Allocate space in a slow but portable way. */
320 size_t i;
321 ssize_t w;
322 off_t p;
323 #define ZERO_BUF_SIZE 512
324 char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
326 for (i = 0; i < binlog_size_limit; i += w) {
327 w = write(fd, &buf, ZERO_BUF_SIZE);
328 if (w == -1) {
329 twarn("Cannot allocate space for binlog %s", log->path);
330 close(fd);
331 binlog_dref(log);
332 return;
336 p = lseek(fd, 0, SEEK_SET);
337 if (p == -1) {
338 twarn("lseek");
339 close(fd);
340 binlog_dref(log);
341 return;
344 #endif
346 bytes_written = write(fd, &binlog_version, sizeof(int));
347 if (written) *written = bytes_written;
349 if (bytes_written < sizeof(int)) {
350 twarn("Cannot write to binlog");
351 close(fd);
352 binlog_dref(log);
353 return;
356 log->fd = fd;
359 /* returns 1 on success, 0 on error. */
360 static int
361 binlog_use_next()
363 binlog next;
365 if (!current_binlog) return 0;
367 next = current_binlog->next;
369 if (!next) return 0;
371 /* assert(current_binlog->reserved == 0); */
373 binlog_close(current_binlog);
374 current_binlog = next;
376 return 1;
379 void
380 binlog_shutdown()
382 binlog_use_next();
383 binlog_close(current_binlog);
386 /* Returns the number of jobs successfully written (either 0 or 1).
388 If this fails, something is seriously wrong. It should never fail because of
389 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
390 fail because of a full disk.)
392 If we are not using the binlog at all (!current_binlog), then we pretend to
393 have made a successful write and return 1. */
395 binlog_write_job(job j)
397 ssize_t written;
398 size_t tube_namelen, to_write = 0;
399 struct iovec vec[4], *vptr;
400 int vcnt = 3, r;
402 if (!current_binlog) return 1;
403 tube_namelen = 0;
405 vec[0].iov_base = (char *) &tube_namelen;
406 to_write += vec[0].iov_len = sizeof(size_t);
408 vec[1].iov_base = j->tube->name;
409 vec[1].iov_len = 0;
411 /* we could save some bytes in the binlog file by only saving some parts of
412 * the job struct */
413 vec[2].iov_base = (char *) j;
414 to_write += vec[2].iov_len = sizeof(struct job);
416 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
417 if (!j->binlog) {
418 tube_namelen = strlen(j->tube->name);
419 to_write += vec[1].iov_len = tube_namelen;
420 vcnt = 4;
421 vec[3].iov_base = j->body;
422 to_write += vec[3].iov_len = j->body_size;
424 } else if (j->state == JOB_STATE_INVALID) {
425 if (j->binlog) binlog_dref(j->binlog);
426 j->binlog = NULL;
427 } else {
428 return twarnx("unserializable job state: %d", j->state), 0;
431 if (to_write > current_binlog->reserved) {
432 r = binlog_use_next();
433 if (!r) return twarnx("failed to use next binlog"), 0;
436 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
438 while (to_write > 0) {
439 written = writev(current_binlog->fd, vec, vcnt);
441 if (written < 0) {
442 if (errno == EAGAIN) continue;
443 if (errno == EINTR) continue;
445 twarn("writev");
446 binlog_close(current_binlog);
447 current_binlog = 0;
448 return 0;
451 to_write -= written;
452 if (to_write > 0 && written > 0) {
453 for (vptr = vec; written >= vptr->iov_len; vptr++) {
454 written -= vptr->iov_len;
455 vptr->iov_len = 0;
457 vptr->iov_base = (char *) vptr->iov_base + written;
458 vptr->iov_len -= written;
460 current_binlog->reserved -= written;
461 j->reserved_binlog_space -= written;
464 return 1;
467 static binlog
468 make_future_binlog()
470 binlog b;
471 size_t header;
473 /* open a new binlog with more space to reserve */
474 b = make_next_binlog();
475 if (!b) return twarnx("error making next binlog"), (binlog) 0;
476 binlog_open(b, &header);
478 /* open failed, so we can't reserve any space */
479 if (b->fd < 0) {
480 free(b);
481 return 0;
484 b->free = binlog_size_limit - header;
485 b->reserved = 0;
486 return b;
489 static int
490 can_move_reserved(size_t n, binlog from, binlog to)
492 return from->reserved >= n && to->free >= n;
495 static void
496 move_reserved(size_t n, binlog from, binlog to)
498 from->reserved -= n;
499 from->free += n;
500 to->reserved += n;
501 to->free -= n;
504 static size_t
505 ensure_free_space(size_t n)
507 binlog fb;
509 if (newest_binlog && newest_binlog->free >= n) return n;
511 /* open a new binlog */
512 fb = make_future_binlog();
513 if (!fb) return twarnx("make_future_binlog"), 0;
515 add_binlog(fb);
516 return n;
519 /* Preserve some invariants immediately after any space reservation.
520 * Invariant 1: current_binlog->reserved >= n.
521 * Invariant 2: current_binlog->reserved is congruent to n (mod z), where z
522 * is the size of a delete record in the binlog. */
523 static size_t
524 maintain_invariant(size_t n)
526 size_t reserved_later, remainder, complement, z, r;
528 /* In this function, reserved bytes are conserved (they are neither created
529 * nor destroyed). We just move them around to preserve the invariant. We
530 * might have to create new free space (i.e. allocate a new binlog file),
531 * though. */
533 /* Invariant 1. */
534 /* This is a loop, but it's guaranteed to run at most once. The proof is
535 * left as an exercise for the reader. */
536 while (current_binlog->reserved < n) {
537 size_t to_move = current_binlog->reserved;
539 r = ensure_free_space(to_move);
540 if (r != to_move) {
541 twarnx("ensure_free_space");
542 if (newest_binlog->reserved >= n) {
543 newest_binlog->reserved -= n;
544 } else {
545 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
547 return 0;
550 move_reserved(to_move, current_binlog, newest_binlog);
551 binlog_use_next();
555 /* Invariant 2. */
557 z = sizeof(size_t) + sizeof(struct job);
558 reserved_later = current_binlog->reserved - n;
559 remainder = reserved_later % z;
560 if (remainder == 0) return n;
561 complement = z - remainder;
562 if (can_move_reserved(complement, newest_binlog, current_binlog)) {
563 move_reserved(complement, newest_binlog, current_binlog);
564 return n;
567 r = ensure_free_space(remainder);
568 if (r != remainder) {
569 twarnx("ensure_free_space");
570 if (newest_binlog->reserved >= n) {
571 newest_binlog->reserved -= n;
572 } else {
573 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
575 return 0;
577 move_reserved(remainder, current_binlog, newest_binlog);
579 return n;
582 /* Returns the number of bytes successfully reserved: either 0 or n. */
583 static size_t
584 binlog_reserve_space(size_t n)
586 size_t r;
588 /* This return value must be nonzero but is otherwise ignored. */
589 if (!current_binlog) return 1;
591 if (current_binlog->free >= n) {
592 current_binlog->free -= n;
593 current_binlog->reserved += n;
594 return maintain_invariant(n);
597 r = ensure_free_space(n);
598 if (r != n) return twarnx("ensure_free_space"), 0;
600 newest_binlog->free -= n;
601 newest_binlog->reserved += n;
602 return maintain_invariant(n);
605 /* Returns the number of bytes reserved. */
606 size_t
607 binlog_reserve_space_put(job j)
609 size_t z = 0;
611 /* reserve space for the initial job record */
612 z += sizeof(size_t);
613 z += strlen(j->tube->name);
614 z += sizeof(struct job);
615 z += j->body_size;
617 /* plus space for a delete to come later */
618 z += sizeof(size_t);
619 z += sizeof(struct job);
621 return binlog_reserve_space(z);
624 size_t
625 binlog_reserve_space_update(job j)
627 size_t z = 0;
629 z += sizeof(size_t);
630 z += sizeof(struct job);
631 return binlog_reserve_space(z);
635 binlog_lock()
637 int r;
638 struct flock lock;
639 char path[PATH_MAX];
641 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
642 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
644 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
645 if (lock_fd == -1) return twarn("open"), 0;
647 lock.l_type = F_WRLCK;
648 lock.l_whence = SEEK_SET;
649 lock.l_start = 0;
650 lock.l_len = 0;
651 r = fcntl(lock_fd, F_SETLK, &lock);
652 if (r) return twarn("fcntl"), 0;
654 return 1;
657 void
658 binlog_init(job binlog_jobs)
660 int binlog_index_min;
661 struct stat sbuf;
662 int fd, idx, r;
663 size_t n;
664 char path[PATH_MAX];
665 binlog b;
667 if (!binlog_dir) return;
669 /* Recover any jobs in old binlogs */
671 if (stat(binlog_dir, &sbuf) < 0) {
672 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
673 } else if (!(sbuf.st_mode & S_IFDIR)) {
674 twarnx("%s", binlog_dir);
675 return;
678 binlog_index_min = binlog_scan_dir();
680 if (binlog_index_min) {
681 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
682 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
683 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
685 fd = open(path, O_RDONLY);
687 if (fd < 0) {
688 twarn("%s", path);
689 } else {
690 b = binlog_iref(add_binlog(make_binlog(path)));
691 b->fd = fd;
692 binlog_read_log_file(b, binlog_jobs);
693 close(fd);
694 b->fd = -1;
695 binlog_dref(b);
702 /* Set up for writing out new jobs */
703 n = ensure_free_space(1);
704 if (!n) return twarnx("error making first writable binlog");
706 current_binlog = newest_binlog;
709 const char *
710 binlog_oldest_index()
712 if (!oldest_binlog) return "0";
714 return strrchr(oldest_binlog->path, '.') + 1;
717 const char *
718 binlog_current_index()
720 if (!newest_binlog) return "0";
722 return strrchr(newest_binlog->path, '.') + 1;