1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #endif /* else we get int types from config.h */
32 #include <sys/resource.h>
33 #include <sys/param.h>
46 typedef struct binlog
*binlog
;
57 /* max size we will create a log file */
58 size_t binlog_size_limit
= BINLOG_SIZE_LIMIT_DEFAULT
;
61 size_t fsync_throttle_ms
= 0;
62 uint64_t last_fsync
= 0;
64 char *binlog_dir
= NULL
;
65 static int binlog_index
= 0;
66 static int binlog_version
= 5;
69 static binlog oldest_binlog
= 0,
73 static const size_t job_record_size
= offsetof(struct job
, pad
);
86 dirp
= opendir(binlog_dir
);
89 while ((dp
= readdir(dirp
)) != NULL
) {
90 name_len
= strlen(dp
->d_name
);
91 if (name_len
> 7 && !strncmp("binlog.", dp
->d_name
, 7)) {
92 val
= strtol(dp
->d_name
+ 7, &endptr
, 10);
93 if (endptr
&& *endptr
== 0) {
94 if (max
== 0 || val
> max
) max
= val
;
95 if (min
== 0 || val
< min
) min
= val
;
101 binlog_index
= (int) max
;
106 binlog_remove_oldest()
108 binlog b
= oldest_binlog
;
112 oldest_binlog
= b
->next
;
119 binlog_iref(binlog b
)
126 binlog_dref(binlog b
)
129 if (b
->refs
< 1) return twarnx("refs is zero for binlog: %s", b
->path
);
133 while (oldest_binlog
&& oldest_binlog
->refs
== 0) {
134 binlog_remove_oldest();
141 binlog_warn(int fd, const char* path, const char *msg)
143 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
144 " Continuing. You may be missing data.");
148 #define binlog_warn(b, fmt, args...) \
149 warnx("WARNING, " fmt " at %s:%u. %s: ", \
150 ##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
151 "Continuing. You may be missing data.")
154 binlog_read_log_file(binlog b
, job binlog_jobs
)
159 char tubename
[MAX_TUBE_NAME_LEN
];
164 r
= read(b
->fd
, &version
, sizeof(version
));
165 if (r
== -1) return twarn("read()");
166 if (r
< sizeof(version
)) {
167 return binlog_warn(b
, "EOF while reading version record");
170 if (version
!= binlog_version
) {
171 return warnx("%s: binlog version mismatch %d %d", b
->path
, version
,
175 while (read(b
->fd
, &namelen
, sizeof(size_t)) == sizeof(size_t)) {
176 if (namelen
>= MAX_TUBE_NAME_LEN
) {
177 return binlog_warn(b
, "namelen %d exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
181 r
= read(b
->fd
, tubename
, namelen
);
182 if (r
== -1) return twarn("read()");
184 lseek(b
->fd
, SEEK_CUR
, 0);
185 return binlog_warn(b
, "EOF while reading tube name");
189 tubename
[namelen
] = '\0';
190 r
= read(b
->fd
, &js
, job_record_size
);
191 if (r
== -1) return twarn("read()");
192 if (r
< job_record_size
) {
193 return binlog_warn(b
, "EOF while reading job record");
200 case JOB_STATE_INVALID
:
203 binlog_dref(j
->binlog
);
208 case JOB_STATE_READY
:
209 case JOB_STATE_DELAYED
:
210 if (!j
&& namelen
> 0) {
211 t
= tube_find_or_make(tubename
);
212 j
= make_job_with_id(js
.pri
, js
.delay
, js
.ttr
, js
.body_size
,
214 j
->next
= j
->prev
= j
;
215 j
->created_at
= js
.created_at
;
216 job_insert(binlog_jobs
, j
);
218 if (js
.body_size
&& namelen
> 0) { /* namelen > 0 only on new jobs */
219 if (js
.body_size
> j
->body_size
) {
220 warnx("job size increased from %zu to %zu", j
->body_size
,
223 binlog_dref(j
->binlog
);
225 return binlog_warn(b
, "EOF while reading job body");
227 r
= read(b
->fd
, j
->body
, js
.body_size
);
228 if (r
== -1) return twarn("read()");
229 if (r
< js
.body_size
) {
230 warnx("dropping incomplete job %llu", j
->id
);
232 binlog_dref(j
->binlog
);
234 return binlog_warn(b
, "EOF while reading job body");
241 j
->deadline_at
= js
.deadline_at
;
245 j
->timeout_ct
= js
.timeout_ct
;
246 j
->release_ct
= js
.release_ct
;
247 j
->bury_ct
= js
.bury_ct
;
248 j
->kick_ct
= js
.kick_ct
;
250 /* this is a complete record, so we can move the binlog ref */
251 if (namelen
&& js
.body_size
) {
252 binlog_dref(j
->binlog
);
253 j
->binlog
= binlog_iref(b
);
260 binlog_close(binlog b
)
265 if (b
->fd
< 0) return;
267 // Some compilers give a warning if the return value of ftruncate is
268 // ignored. So we pretend to use it.
269 r
= ftruncate(b
->fd
, binlog_size_limit
- b
->free
);
271 // Nothing we can do. The user might see warnings next startup.
280 make_binlog(char *path
)
284 b
= (binlog
) malloc(sizeof(struct binlog
) + strlen(path
) + 1);
285 if (!b
) return twarnx("OOM"), (binlog
) 0;
286 strcpy(b
->path
, path
);
301 if (!binlog_dir
) return NULL
;
303 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, ++binlog_index
);
304 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), (binlog
)0;
306 return make_binlog(path
);
312 if (newest_binlog
) newest_binlog
->next
= b
;
314 if (!oldest_binlog
) oldest_binlog
= b
;
320 beanstalkd_fallocate(int fd
, off_t offset
, off_t len
)
325 #define ZERO_BUF_SIZE 512
326 char buf
[ZERO_BUF_SIZE
] = {}; /* initialize to zero */
328 /* we only support a 0 offset */
329 if (offset
!= 0) return EINVAL
;
331 if (len
<= 0) return EINVAL
;
333 for (i
= 0; i
< len
; i
+= w
) {
334 w
= write(fd
, &buf
, ZERO_BUF_SIZE
);
335 if (w
== -1) return errno
;
338 p
= lseek(fd
, 0, SEEK_SET
);
339 if (p
== -1) return errno
;
345 binlog_open(binlog log
, size_t *written
)
348 size_t bytes_written
;
350 if (written
) *written
= 0;
352 if (!binlog_iref(log
)) return;
354 fd
= open(log
->path
, O_WRONLY
| O_CREAT
, 0400);
356 if (fd
< 0) return twarn("Cannot open binlog %s", log
->path
);
358 #ifdef HAVE_POSIX_FALLOCATE
361 r
= posix_fallocate(fd
, 0, binlog_size_limit
);
363 r
= beanstalkd_fallocate(fd
, 0, binlog_size_limit
);
369 return twarn("Cannot allocate space for binlog %s", log
->path
);
373 /* Allocate space in a slow but portable way. */
376 r
= beanstalkd_fallocate(fd
, 0, binlog_size_limit
);
381 return twarn("Cannot allocate space for binlog %s", log
->path
);
386 bytes_written
= write(fd
, &binlog_version
, sizeof(int));
387 if (written
) *written
= bytes_written
;
389 if (bytes_written
< sizeof(int)) {
390 twarn("Cannot write to binlog");
399 /* returns 1 on success, 0 on error. */
405 if (!current_binlog
) return 0;
407 next
= current_binlog
->next
;
411 /* assert(current_binlog->reserved == 0); */
413 binlog_close(current_binlog
);
414 current_binlog
= next
;
423 binlog_close(current_binlog
);
426 /* Returns the number of jobs successfully written (either 0 or 1).
428 If this fails, something is seriously wrong. It should never fail because of
429 a full disk. (The binlog_reserve_space_* functions, on the other hand, can
430 fail because of a full disk.)
432 If we are not using the binlog at all (!current_binlog), then we pretend to
433 have made a successful write and return 1. */
435 binlog_write_job(job j
)
438 size_t tube_namelen
, to_write
= 0;
439 struct iovec vec
[4], *vptr
;
443 if (!current_binlog
) return 1;
446 vec
[0].iov_base
= (char *) &tube_namelen
;
447 to_write
+= vec
[0].iov_len
= sizeof(size_t);
449 vec
[1].iov_base
= j
->tube
->name
;
452 vec
[2].iov_base
= (char *) j
;
453 to_write
+= vec
[2].iov_len
= job_record_size
;
455 if (j
->state
== JOB_STATE_READY
|| j
->state
== JOB_STATE_DELAYED
||
456 j
->state
== JOB_STATE_BURIED
) {
458 tube_namelen
= strlen(j
->tube
->name
);
459 to_write
+= vec
[1].iov_len
= tube_namelen
;
461 vec
[3].iov_base
= j
->body
;
462 to_write
+= vec
[3].iov_len
= j
->body_size
;
464 } else if (j
->state
== JOB_STATE_INVALID
) {
465 if (j
->binlog
) binlog_dref(j
->binlog
);
468 return twarnx("unserializable job state: %d", j
->state
), 0;
471 if (to_write
> current_binlog
->reserved
) {
472 r
= binlog_use_next();
473 if (!r
) return twarnx("failed to use next binlog"), 0;
476 if (j
->state
&& !j
->binlog
) j
->binlog
= binlog_iref(current_binlog
);
478 while (to_write
> 0) {
479 written
= writev(current_binlog
->fd
, vec
, vcnt
);
482 if (errno
== EAGAIN
) continue;
483 if (errno
== EINTR
) continue;
486 binlog_close(current_binlog
);
491 current_binlog
->reserved
-= written
;
492 j
->reserved_binlog_space
-= written
;
495 if (to_write
> 0 && written
> 0) {
496 for (vptr
= vec
; written
>= vptr
->iov_len
; vptr
++) {
497 written
-= vptr
->iov_len
;
500 vptr
->iov_base
= (char *) vptr
->iov_base
+ written
;
501 vptr
->iov_len
-= written
;
505 now
= now_usec() / 1000; /* usec -> msec */
506 if (enable_fsync
&& now
- last_fsync
>= fsync_throttle_ms
) {
507 r
= fdatasync(current_binlog
->fd
);
508 if (r
== -1) return twarn("fdatasync"), 0;
521 /* open a new binlog with more space to reserve */
522 b
= make_next_binlog();
523 if (!b
) return twarnx("error making next binlog"), (binlog
) 0;
524 binlog_open(b
, &header
);
526 /* open failed, so we can't reserve any space */
532 b
->free
= binlog_size_limit
- header
;
538 can_move_reserved(size_t n
, binlog from
, binlog to
)
540 return from
->reserved
>= n
&& to
->free
>= n
;
544 move_reserved(size_t n
, binlog from
, binlog to
)
553 ensure_free_space(size_t n
)
557 if (newest_binlog
&& newest_binlog
->free
>= n
) return n
;
559 /* open a new binlog */
560 fb
= make_future_binlog();
561 if (!fb
) return twarnx("make_future_binlog"), 0;
568 * 1: b->reserved is congruent to n (mod z).
569 * 2: all future binlogs ->reserved is congruent to 0 (mod z).
570 * Returns t on success, and 0 on failure. (Therefore t should be nonzero.) */
572 maintain_invariants_iter(binlog b
, size_t n
, size_t t
)
574 size_t reserved_later
, remainder
, complement
, z
, r
;
576 /* In this function, reserved bytes are conserved (they are neither created
577 * nor destroyed). We just move them around to preserve the invariant. We
578 * might have to create new free space (i.e. allocate a new binlog file),
583 z
= sizeof(size_t) + job_record_size
;
584 reserved_later
= b
->reserved
- n
;
585 remainder
= reserved_later
% z
;
586 if (remainder
== 0) return maintain_invariants_iter(b
->next
, 0, t
);
588 if (b
== newest_binlog
) {
589 twarnx("newest binlog has invalid %zd reserved", b
->reserved
);
590 /* We have failed, so undo the reservation and return 0. */
591 if (newest_binlog
->reserved
>= t
) {
592 newest_binlog
->reserved
-= t
;
594 twarnx("failed to unreserve %zd bytes", t
); /* can't happen */
599 complement
= z
- remainder
;
600 if (can_move_reserved(complement
, newest_binlog
, b
)) {
601 move_reserved(complement
, newest_binlog
, b
);
602 return maintain_invariants_iter(b
->next
, 0, t
);
605 r
= ensure_free_space(remainder
);
606 if (r
!= remainder
) {
607 twarnx("ensure_free_space");
608 /* We have failed, so undo the reservation and return 0. */
609 if (newest_binlog
->reserved
>= t
) {
610 newest_binlog
->reserved
-= t
;
612 twarnx("failed to unreserve %zd bytes", t
); /* can't happen */
616 move_reserved(remainder
, b
, newest_binlog
);
617 return maintain_invariants_iter(b
->next
, 0, t
);
621 /* Preserve some invariants immediately after any space reservation
622 * (where z is the size of a delete record in the binlog).
623 * Invariant 1: current_binlog->reserved >= n.
624 * Invariant 2: current_binlog->reserved is congruent to n (mod z).
625 * Invariant 3: all future binlogs ->reserved is congruent to 0 (mod z). */
627 maintain_invariants(size_t n
)
631 /* In this function, reserved bytes are conserved (they are neither created
632 * nor destroyed). We just move them around to preserve the invariant. We
633 * might have to create new free space (i.e. allocate a new binlog file),
637 /* This is a loop, but it's guaranteed to run at most once. The proof is
638 * left as an exercise for the reader. */
639 while (current_binlog
->reserved
< n
) {
640 size_t to_move
= current_binlog
->reserved
;
642 r
= ensure_free_space(to_move
);
644 twarnx("ensure_free_space");
645 /* We have failed, so undo the reservation and return 0. */
646 if (newest_binlog
->reserved
>= n
) {
647 newest_binlog
->reserved
-= n
;
649 twarnx("failed to unreserve %zd bytes", n
); /* can't happen */
654 move_reserved(to_move
, current_binlog
, newest_binlog
);
658 /* Invariants 2 and 3. */
659 return maintain_invariants_iter(current_binlog
, n
, n
);
662 /* Returns the number of bytes successfully reserved: either 0 or n. */
664 binlog_reserve_space(size_t n
)
668 /* This return value must be nonzero but is otherwise ignored. */
669 if (!current_binlog
) return 1;
671 if (current_binlog
->free
>= n
) {
672 current_binlog
->free
-= n
;
673 current_binlog
->reserved
+= n
;
674 return maintain_invariants(n
);
677 r
= ensure_free_space(n
);
678 if (r
!= n
) return twarnx("ensure_free_space"), 0;
680 newest_binlog
->free
-= n
;
681 newest_binlog
->reserved
+= n
;
682 return maintain_invariants(n
);
685 /* Returns the number of bytes reserved. */
687 binlog_reserve_space_put(job j
)
691 /* reserve space for the initial job record */
693 z
+= strlen(j
->tube
->name
);
694 z
+= job_record_size
;
697 /* plus space for a delete to come later */
699 z
+= job_record_size
;
701 return binlog_reserve_space(z
);
705 binlog_reserve_space_update(job j
)
710 z
+= job_record_size
;
711 return binlog_reserve_space(z
);
721 r
= snprintf(path
, PATH_MAX
, "%s/lock", binlog_dir
);
722 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), 0;
724 lock_fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
725 if (lock_fd
== -1) return twarn("open"), 0;
727 lock
.l_type
= F_WRLCK
;
728 lock
.l_whence
= SEEK_SET
;
731 r
= fcntl(lock_fd
, F_SETLK
, &lock
);
732 if (r
) return twarn("fcntl"), 0;
738 binlog_init(job binlog_jobs
)
740 int binlog_index_min
;
747 if (!binlog_dir
) return;
749 /* Recover any jobs in old binlogs */
751 if (stat(binlog_dir
, &sbuf
) < 0) {
752 if (mkdir(binlog_dir
, 0700) < 0) return twarn("%s", binlog_dir
);
753 } else if (!(sbuf
.st_mode
& S_IFDIR
)) {
754 twarnx("%s", binlog_dir
);
758 binlog_index_min
= binlog_scan_dir();
760 if (binlog_index_min
) {
761 for (idx
= binlog_index_min
; idx
<= binlog_index
; idx
++) {
762 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, idx
);
763 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
);
765 fd
= open(path
, O_RDONLY
);
770 b
= binlog_iref(add_binlog(make_binlog(path
)));
772 binlog_read_log_file(b
, binlog_jobs
);
782 /* Set up for writing out new jobs */
783 n
= ensure_free_space(1);
784 if (!n
) return twarnx("error making first writable binlog");
786 current_binlog
= newest_binlog
;
790 binlog_oldest_index()
792 if (!oldest_binlog
) return "0";
794 return strrchr(oldest_binlog
->path
, '.') + 1;
798 binlog_current_index()
800 if (!newest_binlog
) return "0";
802 return strrchr(newest_binlog
->path
, '.') + 1;