update to newest ct; fixes #60
[beanstalkd.git] / binlog.c
bloba1928d06acb6b0696bc263a0ebe46f759d0e1f3a
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/uio.h>
28 #include <sys/stat.h>
29 #include <limits.h>
30 #include <stddef.h>
31 #include "dat.h"
33 typedef struct binlog *binlog;
35 struct binlog {
36 binlog next;
37 uint refs;
38 int fd;
39 size_t free;
40 size_t reserved;
41 char path[];
44 /* max size we will create a log file */
45 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
47 int enable_fsync = 0;
48 size_t fsync_throttle_ms = 0;
49 uint64 last_fsync = 0;
51 char *binlog_dir = NULL;
52 static int binlog_index = 0;
53 static int binlog_version = 6;
54 static int lock_fd;
56 static binlog oldest_binlog = 0,
57 current_binlog = 0,
58 newest_binlog = 0;
60 static const size_t job_record_size = offsetof(struct job, pad);
62 static int
63 binlog_scan_dir()
65 DIR *dirp;
66 struct dirent *dp;
67 long min = 0;
68 long max = 0;
69 long val;
70 char *endptr;
71 size_t name_len;
73 dirp = opendir(binlog_dir);
74 if (!dirp) return 0;
76 while ((dp = readdir(dirp)) != NULL) {
77 name_len = strlen(dp->d_name);
78 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
79 val = strtol(dp->d_name + 7, &endptr, 10);
80 if (endptr && *endptr == 0) {
81 if (max == 0 || val > max) max = val;
82 if (min == 0 || val < min) min = val;
87 closedir(dirp);
88 binlog_index = (int) max;
89 return (int) min;
92 static void
93 binlog_remove_oldest()
95 binlog b = oldest_binlog;
97 if (!b) return;
99 oldest_binlog = b->next;
101 if (newest_binlog == b) {
102 newest_binlog = b->next; /* == 0 */
105 unlink(b->path);
106 free(b);
109 static binlog
110 binlog_iref(binlog b)
112 if (b) b->refs++;
113 return b;
116 static void
117 binlog_dref(binlog b)
119 if (!b) return;
120 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
122 --b->refs;
123 if (b->refs < 1) {
124 while (oldest_binlog && oldest_binlog->refs == 0) {
125 binlog_remove_oldest();
131 static void
132 binlog_warn(int fd, const char* path, const char *msg)
134 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
135 " Continuing. You may be missing data.");
139 #define binlog_warn(b, fmt, args...) \
140 warnx("WARNING, " fmt " at %s:%u. %s: ", \
141 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
142 "Continuing. You may be missing data.")
144 static void
145 binlog_read_log_file(binlog b, job binlog_jobs)
147 struct job js;
148 tube t;
149 job j;
150 char tubename[MAX_TUBE_NAME_LEN];
151 size_t namelen;
152 ssize_t r;
153 int version;
155 r = read(b->fd, &version, sizeof(version));
156 if (r == -1) return twarn("read()");
157 if (r < sizeof(version)) {
158 return binlog_warn(b, "EOF while reading version record");
161 if (version != binlog_version) {
162 return warnx("%s: binlog version mismatch %d %d", b->path, version,
163 binlog_version);
166 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
167 if (namelen >= MAX_TUBE_NAME_LEN) {
168 return binlog_warn(b, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
171 if (namelen > 0) {
172 r = read(b->fd, tubename, namelen);
173 if (r == -1) return twarn("read()");
174 if (r < namelen) {
175 lseek(b->fd, SEEK_CUR, 0);
176 return binlog_warn(b, "EOF while reading tube name");
180 tubename[namelen] = '\0';
181 r = read(b->fd, &js, job_record_size);
182 if (r == -1) return twarn("read()");
183 if (r < job_record_size) {
184 return binlog_warn(b, "EOF while reading job record");
187 if (!js.id) break;
189 j = job_find(js.id);
190 switch (js.state) {
191 case JOB_STATE_INVALID:
192 if (j) {
193 job_remove(j);
194 binlog_dref(j->binlog);
195 job_free(j);
196 j = NULL;
198 break;
199 case JOB_STATE_READY:
200 case JOB_STATE_DELAYED:
201 if (!j && namelen > 0) {
202 t = tube_find_or_make(tubename);
203 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
204 t, js.id);
205 j->next = j->prev = j;
206 j->created_at = js.created_at;
207 job_insert(binlog_jobs, j);
209 if (js.body_size && namelen > 0) { /* namelen > 0 only on new jobs */
210 if (js.body_size > j->body_size) {
211 warnx("job size increased from %zu to %zu", j->body_size,
212 js.body_size);
213 job_remove(j);
214 binlog_dref(j->binlog);
215 job_free(j);
216 return binlog_warn(b, "EOF while reading job body");
218 r = read(b->fd, j->body, js.body_size);
219 if (r == -1) return twarn("read()");
220 if (r < js.body_size) {
221 warnx("dropping incomplete job %llu", j->id);
222 job_remove(j);
223 binlog_dref(j->binlog);
224 job_free(j);
225 return binlog_warn(b, "EOF while reading job body");
228 break;
230 if (j) {
231 j->state = js.state;
232 j->deadline_at = js.deadline_at;
233 j->pri = js.pri;
234 j->delay = js.delay;
235 j->ttr = js.ttr;
236 j->timeout_ct = js.timeout_ct;
237 j->release_ct = js.release_ct;
238 j->bury_ct = js.bury_ct;
239 j->kick_ct = js.kick_ct;
241 /* this is a complete record, so we can move the binlog ref */
242 if (namelen && js.body_size) {
243 binlog_dref(j->binlog);
244 j->binlog = binlog_iref(b);
250 static void
251 binlog_close(binlog b)
253 int r;
255 if (!b) return;
256 if (b->fd < 0) return;
257 if (b->free) {
258 // Some compilers give a warning if the return value of ftruncate is
259 // ignored. So we pretend to use it.
260 r = ftruncate(b->fd, binlog_size_limit - b->free);
261 if (r == -1) {
262 // Nothing we can do. The user might see warnings next startup.
265 close(b->fd);
266 b->fd = -1;
267 binlog_dref(b);
270 static binlog
271 make_binlog(char *path)
273 binlog b;
275 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
276 if (!b) return twarnx("OOM"), (binlog) 0;
277 strcpy(b->path, path);
278 b->refs = 0;
279 b->next = NULL;
280 b->fd = -1;
281 b->free = 0;
282 b->reserved = 0;
283 return b;
286 static binlog
287 make_next_binlog()
289 int r;
290 char path[PATH_MAX];
292 if (!binlog_dir) return NULL;
294 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
295 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
297 return make_binlog(path);
300 static binlog
301 add_binlog(binlog b)
303 if (newest_binlog) newest_binlog->next = b;
304 newest_binlog = b;
305 if (!oldest_binlog) oldest_binlog = b;
307 return b;
310 static void
311 binlog_open(binlog log, size_t *written)
313 int fd, r;
314 size_t bytes_written;
316 if (written) *written = 0;
318 if (!binlog_iref(log)) return;
320 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
322 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
324 r = falloc(fd, binlog_size_limit);
325 if (r) {
326 close(fd);
327 binlog_dref(log);
328 errno = r;
329 return twarn("Cannot allocate space for binlog %s", log->path);
332 bytes_written = write(fd, &binlog_version, sizeof(int));
333 if (written) *written = bytes_written;
335 if (bytes_written < sizeof(int)) {
336 twarn("Cannot write to binlog");
337 close(fd);
338 binlog_dref(log);
339 return;
342 log->fd = fd;
345 /* returns 1 on success, 0 on error. */
346 static int
347 binlog_use_next()
349 binlog next;
351 if (!current_binlog) return 0;
353 next = current_binlog->next;
355 if (!next) return 0;
357 /* assert(current_binlog->reserved == 0); */
359 binlog_close(current_binlog);
360 current_binlog = next;
362 return 1;
365 void
366 binlog_shutdown()
368 binlog_use_next();
369 binlog_close(current_binlog);
372 /* Returns the number of jobs successfully written (either 0 or 1).
374 If this fails, something is seriously wrong. It should never fail because of
375 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
376 fail because of a full disk.)
378 If we are not using the binlog at all (!current_binlog), then we pretend to
379 have made a successful write and return 1. */
381 binlog_write_job(job j)
383 ssize_t written;
384 size_t tube_namelen, to_write = 0;
385 struct iovec vec[4], *vptr;
386 int vcnt = 3, r;
387 uint64 now;
389 if (!current_binlog) return 1;
390 tube_namelen = 0;
392 vec[0].iov_base = (char *) &tube_namelen;
393 to_write += vec[0].iov_len = sizeof(size_t);
395 vec[1].iov_base = j->tube->name;
396 vec[1].iov_len = 0;
398 vec[2].iov_base = (char *) j;
399 to_write += vec[2].iov_len = job_record_size;
401 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED ||
402 j->state == JOB_STATE_BURIED) {
403 if (!j->binlog) {
404 tube_namelen = strlen(j->tube->name);
405 to_write += vec[1].iov_len = tube_namelen;
406 vcnt = 4;
407 vec[3].iov_base = j->body;
408 to_write += vec[3].iov_len = j->body_size;
410 } else if (j->state == JOB_STATE_INVALID) {
411 if (j->binlog) binlog_dref(j->binlog);
412 j->binlog = NULL;
413 } else {
414 return twarnx("unserializable job state: %d", j->state), 0;
417 if (to_write > current_binlog->reserved) {
418 r = binlog_use_next();
419 if (!r) return twarnx("failed to use next binlog"), 0;
422 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
424 while (to_write > 0) {
425 written = writev(current_binlog->fd, vec, vcnt);
427 if (written < 0) {
428 if (errno == EAGAIN) continue;
429 if (errno == EINTR) continue;
431 twarn("writev");
432 binlog_close(current_binlog);
433 current_binlog = 0;
434 return 0;
437 current_binlog->reserved -= written;
438 j->reserved_binlog_space -= written;
440 to_write -= written;
441 if (to_write > 0 && written > 0) {
442 for (vptr = vec; written >= vptr->iov_len; vptr++) {
443 written -= vptr->iov_len;
444 vptr->iov_len = 0;
446 vptr->iov_base = (char *) vptr->iov_base + written;
447 vptr->iov_len -= written;
451 now = nanoseconds() / 1000000; /* ns -> ms */
452 if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
453 r = fsync(current_binlog->fd);
454 if (r == -1) return twarn("fsync"), 0;
455 last_fsync = now;
458 return 1;
461 static binlog
462 make_future_binlog()
464 binlog b;
465 size_t header;
467 /* open a new binlog with more space to reserve */
468 b = make_next_binlog();
469 if (!b) return twarnx("error making next binlog"), (binlog) 0;
470 binlog_open(b, &header);
472 /* open failed, so we can't reserve any space */
473 if (b->fd < 0) {
474 free(b);
475 return 0;
478 b->free = binlog_size_limit - header;
479 b->reserved = 0;
480 return b;
483 static int
484 can_move_reserved(size_t n, binlog from, binlog to)
486 return from->reserved >= n && to->free >= n;
489 static void
490 move_reserved(size_t n, binlog from, binlog to)
492 from->reserved -= n;
493 from->free += n;
494 to->reserved += n;
495 to->free -= n;
498 static size_t
499 ensure_free_space(size_t n)
501 binlog fb;
503 if (newest_binlog && newest_binlog->free >= n) return n;
505 /* open a new binlog */
506 fb = make_future_binlog();
507 if (!fb) return twarnx("make_future_binlog"), 0;
509 add_binlog(fb);
510 return n;
513 /* Ensures:
514 * 1: b->reserved is congruent to n (mod z).
515 * 2: all future binlogs ->reserved is congruent to 0 (mod z).
516 * Returns t on success, and 0 on failure. (Therefore t should be nonzero.) */
517 static size_t
518 maintain_invariants_iter(binlog b, size_t n, size_t t)
520 size_t reserved_later, remainder, complement, z, r;
522 /* In this function, reserved bytes are conserved (they are neither created
523 * nor destroyed). We just move them around to preserve the invariant. We
524 * might have to create new free space (i.e. allocate a new binlog file),
525 * though. */
527 if (!b) return t;
529 z = sizeof(size_t) + job_record_size;
530 reserved_later = b->reserved - n;
531 remainder = reserved_later % z;
532 if (remainder == 0) return maintain_invariants_iter(b->next, 0, t);
534 if (b == newest_binlog) {
535 twarnx("newest binlog has invalid %zd reserved", b->reserved);
536 /* We have failed, so undo the reservation and return 0. */
537 if (newest_binlog->reserved >= t) {
538 newest_binlog->reserved -= t;
539 } else {
540 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
542 return 0;
545 complement = z - remainder;
546 if (can_move_reserved(complement, newest_binlog, b)) {
547 move_reserved(complement, newest_binlog, b);
548 return maintain_invariants_iter(b->next, 0, t);
551 r = ensure_free_space(remainder);
552 if (r != remainder) {
553 twarnx("ensure_free_space");
554 /* We have failed, so undo the reservation and return 0. */
555 if (newest_binlog->reserved >= t) {
556 newest_binlog->reserved -= t;
557 } else {
558 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
560 return 0;
562 move_reserved(remainder, b, newest_binlog);
563 return maintain_invariants_iter(b->next, 0, t);
567 /* Preserve some invariants immediately after any space reservation
568 * (where z is the size of a delete record in the binlog).
569 * Invariant 1: current_binlog->reserved >= n.
570 * Invariant 2: current_binlog->reserved is congruent to n (mod z).
571 * Invariant 3: all future binlogs ->reserved is congruent to 0 (mod z). */
572 static size_t
573 maintain_invariants(size_t n)
575 size_t r;
577 /* In this function, reserved bytes are conserved (they are neither created
578 * nor destroyed). We just move them around to preserve the invariant. We
579 * might have to create new free space (i.e. allocate a new binlog file),
580 * though. */
582 /* Invariant 1. */
583 /* This is a loop, but it's guaranteed to run at most once. The proof is
584 * left as an exercise for the reader. */
585 while (current_binlog->reserved < n) {
586 size_t to_move = current_binlog->reserved;
588 r = ensure_free_space(to_move);
589 if (r != to_move) {
590 twarnx("ensure_free_space");
591 /* We have failed, so undo the reservation and return 0. */
592 if (newest_binlog->reserved >= n) {
593 newest_binlog->reserved -= n;
594 } else {
595 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
597 return 0;
600 move_reserved(to_move, current_binlog, newest_binlog);
601 binlog_use_next();
604 /* Invariants 2 and 3. */
605 return maintain_invariants_iter(current_binlog, n, n);
608 /* Returns the number of bytes successfully reserved: either 0 or n. */
609 static size_t
610 binlog_reserve_space(size_t n)
612 size_t r;
614 /* This return value must be nonzero but is otherwise ignored. */
615 if (!current_binlog) return 1;
617 if (current_binlog->free >= n) {
618 current_binlog->free -= n;
619 current_binlog->reserved += n;
620 return maintain_invariants(n);
623 r = ensure_free_space(n);
624 if (r != n) return twarnx("ensure_free_space"), 0;
626 newest_binlog->free -= n;
627 newest_binlog->reserved += n;
628 return maintain_invariants(n);
631 /* Returns the number of bytes reserved. */
632 size_t
633 binlog_reserve_space_put(job j)
635 size_t z = 0;
637 /* reserve space for the initial job record */
638 z += sizeof(size_t);
639 z += strlen(j->tube->name);
640 z += job_record_size;
641 z += j->body_size;
643 /* plus space for a delete to come later */
644 z += sizeof(size_t);
645 z += job_record_size;
647 return binlog_reserve_space(z);
650 size_t
651 binlog_reserve_space_update(job j)
653 size_t z = 0;
655 z += sizeof(size_t);
656 z += job_record_size;
657 return binlog_reserve_space(z);
661 binlog_lock()
663 int r;
664 struct flock lock;
665 char path[PATH_MAX];
667 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
668 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
670 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
671 if (lock_fd == -1) return twarn("open"), 0;
673 lock.l_type = F_WRLCK;
674 lock.l_whence = SEEK_SET;
675 lock.l_start = 0;
676 lock.l_len = 0;
677 r = fcntl(lock_fd, F_SETLK, &lock);
678 if (r) return twarn("fcntl"), 0;
680 return 1;
683 void
684 binlog_init(job binlog_jobs)
686 int binlog_index_min;
687 struct stat sbuf;
688 int fd, idx, r;
689 size_t n;
690 char path[PATH_MAX];
691 binlog b;
693 if (!binlog_dir) return;
695 /* Recover any jobs in old binlogs */
697 if (stat(binlog_dir, &sbuf) < 0) {
698 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
699 } else if (!(sbuf.st_mode & S_IFDIR)) {
700 twarnx("%s", binlog_dir);
701 return;
704 binlog_index_min = binlog_scan_dir();
706 if (binlog_index_min) {
707 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
708 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
709 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
711 fd = open(path, O_RDONLY);
713 if (fd < 0) {
714 twarn("%s", path);
715 } else {
716 b = binlog_iref(add_binlog(make_binlog(path)));
717 b->fd = fd;
718 binlog_read_log_file(b, binlog_jobs);
719 close(fd);
720 b->fd = -1;
721 binlog_dref(b);
727 /* Set up for writing out new jobs */
728 n = ensure_free_space(1);
729 if (!n) return twarnx("error making first writable binlog");
731 current_binlog = newest_binlog;
734 const char *
735 binlog_oldest_index()
737 if (!oldest_binlog) return "0";
739 return strrchr(oldest_binlog->path, '.') + 1;
742 const char *
743 binlog_current_index()
745 if (!newest_binlog) return "0";
747 return strrchr(newest_binlog->path, '.') + 1;