allow writing reserved jobs; treat them as ready
[beanstalkd.git] / binlog.c
blobc9f21ea4328dba8fd2c689719fe9dbd8e71a60fc
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/uio.h>
28 #include <sys/stat.h>
29 #include <limits.h>
30 #include <stddef.h>
31 #include "dat.h"
33 typedef struct binlog *binlog;
35 struct binlog {
36 binlog next;
37 uint refs;
38 int fd;
39 size_t free;
40 size_t reserved;
41 char path[];
44 /* max size we will create a log file */
45 size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
47 int enable_fsync = 0;
48 size_t fsync_throttle_ms = 0;
49 uint64 last_fsync = 0;
51 char *binlog_dir = NULL;
52 static int binlog_index = 0;
53 static int binlog_version = 6;
54 static int lock_fd;
56 static binlog oldest_binlog = 0,
57 current_binlog = 0,
58 newest_binlog = 0;
60 static const size_t job_record_size = offsetof(struct job, pad);
62 static int
63 binlog_scan_dir()
65 DIR *dirp;
66 struct dirent *dp;
67 long min = 0;
68 long max = 0;
69 long val;
70 char *endptr;
71 size_t name_len;
73 dirp = opendir(binlog_dir);
74 if (!dirp) return 0;
76 while ((dp = readdir(dirp)) != NULL) {
77 name_len = strlen(dp->d_name);
78 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
79 val = strtol(dp->d_name + 7, &endptr, 10);
80 if (endptr && *endptr == 0) {
81 if (max == 0 || val > max) max = val;
82 if (min == 0 || val < min) min = val;
87 closedir(dirp);
88 binlog_index = (int) max;
89 return (int) min;
92 static void
93 binlog_remove_oldest()
95 binlog b = oldest_binlog;
97 if (!b) return;
99 oldest_binlog = b->next;
101 if (newest_binlog == b) {
102 newest_binlog = b->next; /* == 0 */
105 unlink(b->path);
106 free(b);
109 static binlog
110 binlog_iref(binlog b)
112 if (b) b->refs++;
113 return b;
116 static void
117 binlog_dref(binlog b)
119 if (!b) return;
120 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
122 --b->refs;
123 if (b->refs < 1) {
124 while (oldest_binlog && oldest_binlog->refs == 0) {
125 binlog_remove_oldest();
131 static void
132 binlog_warn(int fd, const char* path, const char *msg)
134 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
135 " Continuing. You may be missing data.");
139 #define binlog_warn(b, fmt, args...) \
140 warnx("WARNING, " fmt " at %s:%u. %s: ", \
141 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
142 "Continuing. You may be missing data.")
144 static void
145 binlog_read_log_file(binlog b, job binlog_jobs)
147 struct job js;
148 tube t;
149 job j;
150 char tubename[MAX_TUBE_NAME_LEN];
151 size_t namelen;
152 ssize_t r;
153 int version;
155 r = read(b->fd, &version, sizeof(version));
156 if (r == -1) return twarn("read()");
157 if (r < sizeof(version)) {
158 return binlog_warn(b, "EOF while reading version record");
161 if (version != binlog_version) {
162 return warnx("%s: binlog version mismatch %d %d", b->path, version,
163 binlog_version);
166 while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
167 if (namelen >= MAX_TUBE_NAME_LEN) {
168 return binlog_warn(b, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
171 if (namelen > 0) {
172 r = read(b->fd, tubename, namelen);
173 if (r == -1) return twarn("read()");
174 if (r < namelen) {
175 lseek(b->fd, SEEK_CUR, 0);
176 return binlog_warn(b, "EOF while reading tube name");
180 tubename[namelen] = '\0';
181 r = read(b->fd, &js, job_record_size);
182 if (r == -1) return twarn("read()");
183 if (r < job_record_size) {
184 return binlog_warn(b, "EOF while reading job record");
187 if (!js.id) break;
189 j = job_find(js.id);
190 switch (js.state) {
191 case JOB_STATE_INVALID:
192 if (j) {
193 job_remove(j);
194 binlog_dref(j->binlog);
195 job_free(j);
196 j = NULL;
198 break;
199 case JOB_STATE_RESERVED:
200 js.state = JOB_STATE_READY;
201 case JOB_STATE_READY:
202 case JOB_STATE_DELAYED:
203 if (!j && namelen > 0) {
204 t = tube_find_or_make(tubename);
205 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
206 t, js.id);
207 j->next = j->prev = j;
208 j->created_at = js.created_at;
209 job_insert(binlog_jobs, j);
211 if (js.body_size && namelen > 0) { /* namelen > 0 only on new jobs */
212 if (js.body_size > j->body_size) {
213 warnx("job size increased from %zu to %zu", j->body_size,
214 js.body_size);
215 job_remove(j);
216 binlog_dref(j->binlog);
217 job_free(j);
218 return binlog_warn(b, "EOF while reading job body");
220 r = read(b->fd, j->body, js.body_size);
221 if (r == -1) return twarn("read()");
222 if (r < js.body_size) {
223 warnx("dropping incomplete job %llu", j->id);
224 job_remove(j);
225 binlog_dref(j->binlog);
226 job_free(j);
227 return binlog_warn(b, "EOF while reading job body");
230 break;
232 if (j) {
233 j->state = js.state;
234 j->deadline_at = js.deadline_at;
235 j->pri = js.pri;
236 j->delay = js.delay;
237 j->ttr = js.ttr;
238 j->timeout_ct = js.timeout_ct;
239 j->release_ct = js.release_ct;
240 j->bury_ct = js.bury_ct;
241 j->kick_ct = js.kick_ct;
243 /* this is a complete record, so we can move the binlog ref */
244 if (namelen && js.body_size) {
245 binlog_dref(j->binlog);
246 j->binlog = binlog_iref(b);
252 static void
253 binlog_close(binlog b)
255 int r;
257 if (!b) return;
258 if (b->fd < 0) return;
259 if (b->free) {
260 // Some compilers give a warning if the return value of ftruncate is
261 // ignored. So we pretend to use it.
262 r = ftruncate(b->fd, binlog_size_limit - b->free);
263 if (r == -1) {
264 // Nothing we can do. The user might see warnings next startup.
267 close(b->fd);
268 b->fd = -1;
269 binlog_dref(b);
272 static binlog
273 make_binlog(char *path)
275 binlog b;
277 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
278 if (!b) return twarnx("OOM"), (binlog) 0;
279 strcpy(b->path, path);
280 b->refs = 0;
281 b->next = NULL;
282 b->fd = -1;
283 b->free = 0;
284 b->reserved = 0;
285 return b;
288 static binlog
289 make_next_binlog()
291 int r;
292 char path[PATH_MAX];
294 if (!binlog_dir) return NULL;
296 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
297 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
299 return make_binlog(path);
302 static binlog
303 add_binlog(binlog b)
305 if (newest_binlog) newest_binlog->next = b;
306 newest_binlog = b;
307 if (!oldest_binlog) oldest_binlog = b;
309 return b;
312 static void
313 binlog_open(binlog log, size_t *written)
315 int fd, r;
316 size_t bytes_written;
318 if (written) *written = 0;
320 if (!binlog_iref(log)) return;
322 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
324 if (fd < 0) return twarn("Cannot open binlog %s", log->path);
326 r = falloc(fd, binlog_size_limit);
327 if (r) {
328 close(fd);
329 binlog_dref(log);
330 errno = r;
331 return twarn("Cannot allocate space for binlog %s", log->path);
334 bytes_written = write(fd, &binlog_version, sizeof(int));
335 if (written) *written = bytes_written;
337 if (bytes_written < sizeof(int)) {
338 twarn("Cannot write to binlog");
339 close(fd);
340 binlog_dref(log);
341 return;
344 log->fd = fd;
347 /* returns 1 on success, 0 on error. */
348 static int
349 binlog_use_next()
351 binlog next;
353 if (!current_binlog) return 0;
355 next = current_binlog->next;
357 if (!next) return 0;
359 /* assert(current_binlog->reserved == 0); */
361 binlog_close(current_binlog);
362 current_binlog = next;
364 return 1;
367 void
368 binlog_shutdown()
370 binlog_use_next();
371 binlog_close(current_binlog);
374 /* Returns the number of jobs successfully written (either 0 or 1).
376 If this fails, something is seriously wrong. It should never fail because of
377 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
378 fail because of a full disk.)
380 If we are not using the binlog at all (!current_binlog), then we pretend to
381 have made a successful write and return 1. */
383 binlog_write_job(job j)
385 ssize_t written;
386 size_t tube_namelen, to_write = 0;
387 struct iovec vec[4], *vptr;
388 int vcnt = 3, r;
389 uint64 now;
391 if (!current_binlog) return 1;
392 tube_namelen = 0;
394 vec[0].iov_base = (char *) &tube_namelen;
395 to_write += vec[0].iov_len = sizeof(size_t);
397 vec[1].iov_base = j->tube->name;
398 vec[1].iov_len = 0;
400 vec[2].iov_base = (char *) j;
401 to_write += vec[2].iov_len = job_record_size;
403 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED ||
404 j->state == JOB_STATE_BURIED || j->state == JOB_STATE_RESERVED) {
405 if (!j->binlog) {
406 tube_namelen = strlen(j->tube->name);
407 to_write += vec[1].iov_len = tube_namelen;
408 vcnt = 4;
409 vec[3].iov_base = j->body;
410 to_write += vec[3].iov_len = j->body_size;
412 } else if (j->state == JOB_STATE_INVALID) {
413 if (j->binlog) binlog_dref(j->binlog);
414 j->binlog = NULL;
415 } else {
416 return twarnx("unserializable job state: %d", j->state), 0;
419 if (to_write > current_binlog->reserved) {
420 r = binlog_use_next();
421 if (!r) return twarnx("failed to use next binlog"), 0;
424 if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
426 while (to_write > 0) {
427 written = writev(current_binlog->fd, vec, vcnt);
429 if (written < 0) {
430 if (errno == EAGAIN) continue;
431 if (errno == EINTR) continue;
433 twarn("writev");
434 binlog_close(current_binlog);
435 current_binlog = 0;
436 return 0;
439 current_binlog->reserved -= written;
440 j->reserved_binlog_space -= written;
442 to_write -= written;
443 if (to_write > 0 && written > 0) {
444 for (vptr = vec; written >= vptr->iov_len; vptr++) {
445 written -= vptr->iov_len;
446 vptr->iov_len = 0;
448 vptr->iov_base = (char *) vptr->iov_base + written;
449 vptr->iov_len -= written;
453 now = nanoseconds() / 1000000; /* ns -> ms */
454 if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
455 r = fsync(current_binlog->fd);
456 if (r == -1) return twarn("fsync"), 0;
457 last_fsync = now;
460 return 1;
463 static binlog
464 make_future_binlog()
466 binlog b;
467 size_t header;
469 /* open a new binlog with more space to reserve */
470 b = make_next_binlog();
471 if (!b) return twarnx("error making next binlog"), (binlog) 0;
472 binlog_open(b, &header);
474 /* open failed, so we can't reserve any space */
475 if (b->fd < 0) {
476 free(b);
477 return 0;
480 b->free = binlog_size_limit - header;
481 b->reserved = 0;
482 return b;
485 static int
486 can_move_reserved(size_t n, binlog from, binlog to)
488 return from->reserved >= n && to->free >= n;
491 static void
492 move_reserved(size_t n, binlog from, binlog to)
494 from->reserved -= n;
495 from->free += n;
496 to->reserved += n;
497 to->free -= n;
500 static size_t
501 ensure_free_space(size_t n)
503 binlog fb;
505 if (newest_binlog && newest_binlog->free >= n) return n;
507 /* open a new binlog */
508 fb = make_future_binlog();
509 if (!fb) return twarnx("make_future_binlog"), 0;
511 add_binlog(fb);
512 return n;
515 /* Ensures:
516 * 1: b->reserved is congruent to n (mod z).
517 * 2: all future binlogs ->reserved is congruent to 0 (mod z).
518 * Returns t on success, and 0 on failure. (Therefore t should be nonzero.) */
519 static size_t
520 maintain_invariants_iter(binlog b, size_t n, size_t t)
522 size_t reserved_later, remainder, complement, z, r;
524 /* In this function, reserved bytes are conserved (they are neither created
525 * nor destroyed). We just move them around to preserve the invariant. We
526 * might have to create new free space (i.e. allocate a new binlog file),
527 * though. */
529 if (!b) return t;
531 z = sizeof(size_t) + job_record_size;
532 reserved_later = b->reserved - n;
533 remainder = reserved_later % z;
534 if (remainder == 0) return maintain_invariants_iter(b->next, 0, t);
536 if (b == newest_binlog) {
537 twarnx("newest binlog has invalid %zd reserved", b->reserved);
538 /* We have failed, so undo the reservation and return 0. */
539 if (newest_binlog->reserved >= t) {
540 newest_binlog->reserved -= t;
541 } else {
542 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
544 return 0;
547 complement = z - remainder;
548 if (can_move_reserved(complement, newest_binlog, b)) {
549 move_reserved(complement, newest_binlog, b);
550 return maintain_invariants_iter(b->next, 0, t);
553 r = ensure_free_space(remainder);
554 if (r != remainder) {
555 twarnx("ensure_free_space");
556 /* We have failed, so undo the reservation and return 0. */
557 if (newest_binlog->reserved >= t) {
558 newest_binlog->reserved -= t;
559 } else {
560 twarnx("failed to unreserve %zd bytes", t); /* can't happen */
562 return 0;
564 move_reserved(remainder, b, newest_binlog);
565 return maintain_invariants_iter(b->next, 0, t);
569 /* Preserve some invariants immediately after any space reservation
570 * (where z is the size of a delete record in the binlog).
571 * Invariant 1: current_binlog->reserved >= n.
572 * Invariant 2: current_binlog->reserved is congruent to n (mod z).
573 * Invariant 3: all future binlogs ->reserved is congruent to 0 (mod z). */
574 static size_t
575 maintain_invariants(size_t n)
577 size_t r;
579 /* In this function, reserved bytes are conserved (they are neither created
580 * nor destroyed). We just move them around to preserve the invariant. We
581 * might have to create new free space (i.e. allocate a new binlog file),
582 * though. */
584 /* Invariant 1. */
585 /* This is a loop, but it's guaranteed to run at most once. The proof is
586 * left as an exercise for the reader. */
587 while (current_binlog->reserved < n) {
588 size_t to_move = current_binlog->reserved;
590 r = ensure_free_space(to_move);
591 if (r != to_move) {
592 twarnx("ensure_free_space");
593 /* We have failed, so undo the reservation and return 0. */
594 if (newest_binlog->reserved >= n) {
595 newest_binlog->reserved -= n;
596 } else {
597 twarnx("failed to unreserve %zd bytes", n); /* can't happen */
599 return 0;
602 move_reserved(to_move, current_binlog, newest_binlog);
603 binlog_use_next();
606 /* Invariants 2 and 3. */
607 return maintain_invariants_iter(current_binlog, n, n);
610 /* Returns the number of bytes successfully reserved: either 0 or n. */
611 static size_t
612 binlog_reserve_space(size_t n)
614 size_t r;
616 /* This return value must be nonzero but is otherwise ignored. */
617 if (!current_binlog) return 1;
619 if (current_binlog->free >= n) {
620 current_binlog->free -= n;
621 current_binlog->reserved += n;
622 return maintain_invariants(n);
625 r = ensure_free_space(n);
626 if (r != n) return twarnx("ensure_free_space"), 0;
628 newest_binlog->free -= n;
629 newest_binlog->reserved += n;
630 return maintain_invariants(n);
633 /* Returns the number of bytes reserved. */
634 size_t
635 binlog_reserve_space_put(job j)
637 size_t z = 0;
639 /* reserve space for the initial job record */
640 z += sizeof(size_t);
641 z += strlen(j->tube->name);
642 z += job_record_size;
643 z += j->body_size;
645 /* plus space for a delete to come later */
646 z += sizeof(size_t);
647 z += job_record_size;
649 return binlog_reserve_space(z);
652 size_t
653 binlog_reserve_space_update(job j)
655 size_t z = 0;
657 z += sizeof(size_t);
658 z += job_record_size;
659 return binlog_reserve_space(z);
663 binlog_lock()
665 int r;
666 struct flock lock;
667 char path[PATH_MAX];
669 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
670 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
672 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
673 if (lock_fd == -1) return twarn("open"), 0;
675 lock.l_type = F_WRLCK;
676 lock.l_whence = SEEK_SET;
677 lock.l_start = 0;
678 lock.l_len = 0;
679 r = fcntl(lock_fd, F_SETLK, &lock);
680 if (r) return twarn("fcntl"), 0;
682 return 1;
685 void
686 binlog_init(job binlog_jobs)
688 int binlog_index_min;
689 struct stat sbuf;
690 int fd, idx, r;
691 size_t n;
692 char path[PATH_MAX];
693 binlog b;
695 if (!binlog_dir) return;
697 /* Recover any jobs in old binlogs */
699 if (stat(binlog_dir, &sbuf) < 0) {
700 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
701 } else if (!(sbuf.st_mode & S_IFDIR)) {
702 twarnx("%s", binlog_dir);
703 return;
706 binlog_index_min = binlog_scan_dir();
708 if (binlog_index_min) {
709 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
710 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
711 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
713 fd = open(path, O_RDONLY);
715 if (fd < 0) {
716 twarn("%s", path);
717 } else {
718 b = binlog_iref(add_binlog(make_binlog(path)));
719 b->fd = fd;
720 binlog_read_log_file(b, binlog_jobs);
721 close(fd);
722 b->fd = -1;
723 binlog_dref(b);
729 /* Set up for writing out new jobs */
730 n = ensure_free_space(1);
731 if (!n) return twarnx("error making first writable binlog");
733 current_binlog = newest_binlog;
736 const char *
737 binlog_oldest_index()
739 if (!oldest_binlog) return "0";
741 return strrchr(oldest_binlog->path, '.') + 1;
744 const char *
745 binlog_current_index()
747 if (!newest_binlog) return "0";
749 return strrchr(newest_binlog->path, '.') + 1;