Portably allocate space if there's no built in way.
[beanstalkd.git] / binlog.c
blob20080f9d18a141b652642d204769d040e8164359
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
31 #include <limits.h>
33 #include "tube.h"
34 #include "job.h"
35 #include "binlog.h"
36 #include "util.h"
37 #include "config.h"
39 /* max size we will create a log file */
40 size_t binlog_size_limit = 10 << 20;
42 char *binlog_dir = NULL;
43 static int binlog_index = 0;
44 static int binlog_fd = -1, next_binlog_fd = -1;
45 static int binlog_version = 2;
46 static size_t binlog_space = 0, next_binlog_space = 0;
47 static size_t binlog_reserved = 0, next_binlog_reserved = 0;
48 static size_t bytes_written;
49 static int lock_fd;
51 static binlog first_binlog = NULL, last_binlog = NULL, next_binlog = NULL;
53 static int
54 binlog_scan_dir()
56 DIR *dirp;
57 struct dirent *dp;
58 long min = 0;
59 long max = 0;
60 long val;
61 char *endptr;
62 size_t name_len;
64 dirp = opendir(binlog_dir);
65 if (!dirp) return 0;
67 while ((dp = readdir(dirp)) != NULL) {
68 name_len = strlen(dp->d_name);
69 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
70 val = strtol(dp->d_name + 7, &endptr, 10);
71 if (endptr && *endptr == 0) {
72 if (max == 0 || val > max) max = val;
73 if (min == 0 || val < min) min = val;
78 closedir(dirp);
79 binlog_index = (int) max;
80 return (int) min;
83 static void
84 binlog_remove_first()
86 binlog b = first_binlog;
88 if (!b) return;
90 first_binlog = b->next;
91 if (!first_binlog) last_binlog = NULL;
93 unlink(b->path);
94 free(b);
97 static binlog
98 binlog_iref(binlog b)
100 if (b) b->refs++;
101 return b;
104 static void
105 binlog_dref(binlog b)
107 if (!b) return;
108 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
110 --b->refs;
111 if (b->refs < 1) {
112 while (first_binlog && first_binlog->refs == 0) binlog_remove_first();
117 static void
118 binlog_warn(int fd, const char* path, const char *msg)
120 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
121 " Continuing. You may be missing data.");
125 #define binlog_warn(fd, path, fmt, args...) \
126 warnx("WARNING, " fmt " at %s:%u. %s: ", \
127 ##args, path, lseek(fd, 0, SEEK_CUR), \
128 "Continuing. You may be missing data.")
130 static void
131 binlog_read_one(int fd, job binlog_jobs, const char *path)
133 struct job js;
134 tube t;
135 job j;
136 char tubename[MAX_TUBE_NAME_LEN];
137 size_t namelen;
138 ssize_t r;
139 int version;
141 r = read(fd, &version, sizeof(version));
142 if (r == -1) return twarn("read()");
143 if (r < sizeof(version)) {
144 return binlog_warn(fd, path, "EOF while reading version record");
147 if (version != binlog_version) {
148 return warnx("%s: binlog version mismatch %d %d", path, version,
149 binlog_version);
152 while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
153 if (namelen > 0) {
154 r = read(fd, tubename, namelen);
155 if (r == -1) return twarn("read()");
156 if (r < namelen) {
157 lseek(fd, SEEK_CUR, 0);
158 return binlog_warn(fd, path, "EOF while reading tube name");
162 tubename[namelen] = '\0';
163 r = read(fd, &js, sizeof(struct job));
164 if (r == -1) return twarn("read()");
165 if (r < sizeof(struct job)) {
166 return binlog_warn(fd, path, "EOF while reading job record");
169 if (!js.id) break;
171 j = job_find(js.id);
172 switch (js.state) {
173 case JOB_STATE_INVALID:
174 if (j) {
175 job_remove(j);
176 binlog_dref(j->binlog);
177 job_free(j);
178 j = NULL;
180 break;
181 case JOB_STATE_READY:
182 case JOB_STATE_DELAYED:
183 if (!j) {
184 t = tube_find_or_make(tubename);
185 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
186 t, js.id);
187 j->next = j->prev = j;
188 j->creation = js.creation;
189 job_insert(binlog_jobs, j);
191 if (js.body_size) {
192 if (js.body_size > j->body_size) {
193 warnx("job size increased from %zu to %zu", j->body_size,
194 js.body_size);
195 job_remove(j);
196 binlog_dref(j->binlog);
197 job_free(j);
198 return binlog_warn(fd, path, "EOF while reading job body");
200 r = read(fd, j->body, js.body_size);
201 if (r == -1) return twarn("read()");
202 if (r < js.body_size) {
203 warnx("dropping incomplete job %llu", j->id);
204 job_remove(j);
205 binlog_dref(j->binlog);
206 job_free(j);
207 return binlog_warn(fd, path, "EOF while reading job body");
210 break;
212 if (j) {
213 j->state = js.state;
214 j->deadline = js.deadline;
215 j->pri = js.pri;
216 j->delay = js.delay;
217 j->ttr = js.ttr;
218 j->timeout_ct = js.timeout_ct;
219 j->release_ct = js.release_ct;
220 j->bury_ct = js.bury_ct;
221 j->kick_ct = js.kick_ct;
223 /* this is a complete record, so we can move the binlog ref */
224 if (namelen && js.body_size) {
225 binlog_dref(j->binlog);
226 j->binlog = binlog_iref(last_binlog);
232 static void
233 binlog_close_last()
235 if (binlog_fd < 0) return;
236 close(binlog_fd);
237 binlog_dref(last_binlog);
238 binlog_fd = -1;
241 static binlog
242 make_binlog(char *path)
244 binlog b;
246 b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
247 if (!b) return twarnx("OOM"), NULL;
248 strcpy(b->path, path);
249 b->refs = 0;
250 b->next = NULL;
251 return b;
254 static binlog
255 make_next_binlog()
257 int r;
258 char path[PATH_MAX];
260 if (!binlog_dir) return NULL;
262 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
263 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), NULL;
265 return make_binlog(path);
268 static binlog
269 add_binlog(binlog b)
271 if (last_binlog) last_binlog->next = b;
272 last_binlog = b;
273 if (!first_binlog) first_binlog = b;
275 return b;
278 static int
279 binlog_open(binlog log)
281 int fd;
283 if (!binlog_iref(log)) return -1;
285 fd = open(log->path, O_WRONLY | O_CREAT, 0400);
287 if (fd < 0) return twarn("Cannot open binlog %s", log->path), -1;
289 #ifdef HAVE_POSIX_FALLOCATE
291 int r;
292 r = posix_fallocate(fd, 0, binlog_size_limit);
293 if (r) {
294 close(fd);
295 binlog_dref(log);
296 errno = r;
297 return twarn("Cannot allocate space for binlog %s", log->path), -1;
300 #else
301 /* Allocate space in a slow but portable way. */
303 size_t i;
304 ssize_t w;
305 off_t p;
306 #define ZERO_BUF_SIZE 512
307 char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
309 for (i = 0; i < binlog_size_limit; i += w) {
310 w = write(fd, &buf, ZERO_BUF_SIZE);
311 if (w == -1) {
312 twarn("Cannot allocate space for binlog %s", log->path);
313 close(fd);
314 binlog_dref(log);
315 return -1;
319 p = lseek(fd, 0, SEEK_SET);
320 if (p == -1) {
321 twarn("lseek");
322 close(fd);
323 binlog_dref(log);
324 return -1;
327 #endif
329 bytes_written = write(fd, &binlog_version, sizeof(int));
331 if (bytes_written < sizeof(int)) {
332 twarn("Cannot write to binlog");
333 close(fd);
334 binlog_dref(log);
335 return -1;
338 return fd;
341 /* returns 1 on success, 0 on error. */
342 static int
343 binlog_use_next()
345 if (binlog_fd < 0) return 0;
346 if (next_binlog_fd < 0) return 0;
347 if (binlog_reserved > next_binlog_space) return twarnx("overextended"), 0;
349 binlog_close_last();
351 binlog_fd = next_binlog_fd;
352 add_binlog(next_binlog);
354 next_binlog = NULL;
355 next_binlog_fd = -1;
357 binlog_space = next_binlog_space - binlog_reserved;
358 binlog_reserved = next_binlog_reserved + binlog_reserved;
360 next_binlog_reserved = next_binlog_space = 0;
361 return 1;
364 void
365 binlog_close()
367 binlog_use_next();
368 binlog_close_last();
371 /* Returns the number of jobs successfully written (either 0 or 1). */
372 /* If we are not using the binlog at all (binlog_fd < 0), then we pretend to
373 have made a successful write and return 1. */
375 binlog_write_job(job j)
377 ssize_t written;
378 size_t tube_namelen, to_write = 0;
379 struct iovec vec[4], *vptr;
380 int vcnt = 3, r;
382 if (binlog_fd < 0) return 1;
383 tube_namelen = 0;
385 vec[0].iov_base = (char *) &tube_namelen;
386 to_write += vec[0].iov_len = sizeof(size_t);
388 vec[1].iov_base = j->tube->name;
389 vec[1].iov_len = 0;
391 /* we could save some bytes in the binlog file by only saving some parts of
392 * the job struct */
393 vec[2].iov_base = (char *) j;
394 to_write += vec[2].iov_len = sizeof(struct job);
396 printf("writing job %lld state %d\n", j->id, j->state);
398 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
399 if (!j->binlog) {
400 tube_namelen = strlen(j->tube->name);
401 to_write += vec[1].iov_len = tube_namelen;
402 vcnt = 4;
403 vec[3].iov_base = j->body;
404 to_write += vec[3].iov_len = j->body_size;
406 } else if (j->state == JOB_STATE_INVALID) {
407 if (j->binlog) binlog_dref(j->binlog);
408 j->binlog = NULL;
409 } else {
410 return twarnx("unserializable job state: %d", j->state), 0;
413 if (to_write > binlog_reserved) {
414 r = binlog_use_next();
415 if (!r) return twarnx("failed to use next binlog"), 0;
418 if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog);
420 while (to_write > 0) {
421 written = writev(binlog_fd, vec, vcnt);
423 if (written < 0) {
424 if (errno == EAGAIN) continue;
425 if (errno == EINTR) continue;
427 twarn("writev");
428 binlog_close_last();
429 return 0;
432 bytes_written += written;
433 to_write -= written;
434 if (to_write > 0 && written > 0) {
435 for (vptr = vec; written >= vptr->iov_len; vptr++) {
436 written -= vptr->iov_len;
437 vptr->iov_len = 0;
439 vptr->iov_base = (char *) vptr->iov_base + written;
440 vptr->iov_len -= written;
444 return 1;
447 /* Returns the number of bytes successfully reserved: either 0 or n. */
448 static size_t
449 binlog_reserve_space(size_t n)
451 /* This value must be nonzero but is otherwise ignored. */
452 if (binlog_fd < 0) return 1;
454 if (n <= binlog_space) {
455 binlog_space -= n;
456 binlog_reserved += n;
457 return n;
460 if (n <= next_binlog_space) {
461 next_binlog_space -= n;
462 next_binlog_reserved += n;
463 return n;
466 /* The next binlog is already allocated and it is full. */
467 if (next_binlog_fd >= 0) return 0;
469 /* open a new binlog with more space to reserve */
470 next_binlog = make_next_binlog();
471 if (!next_binlog) return twarnx("error making next binlog"), 0;
472 next_binlog_fd = binlog_open(next_binlog);
474 /* open failed, so we can't reserve any space */
475 if (next_binlog_fd < 0) return 0;
477 next_binlog_space = binlog_size_limit - bytes_written - n;
478 next_binlog_reserved = n;
480 return n;
483 /* Returns the number of bytes reserved. */
484 size_t
485 binlog_reserve_space_put(job j)
487 size_t z = 0;
489 /* reserve space for the initial job record */
490 z += sizeof(size_t);
491 z += strlen(j->tube->name);
492 z += sizeof(struct job);
493 z += j->body_size;
495 /* plus space for a delete to come later */
496 z += sizeof(size_t);
497 z += sizeof(struct job);
499 return binlog_reserve_space(z);
502 size_t
503 binlog_reserve_space_update(job j)
505 size_t z = 0;
507 z += sizeof(size_t);
508 z += sizeof(struct job);
509 return binlog_reserve_space(z);
512 void
513 binlog_read(job binlog_jobs)
515 int binlog_index_min;
516 struct stat sbuf;
517 int fd, idx, r;
518 char path[PATH_MAX];
519 binlog b;
521 if (!binlog_dir) return;
523 if (stat(binlog_dir, &sbuf) < 0) {
524 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
525 } else if (!(sbuf.st_mode & S_IFDIR)) {
526 twarnx("%s", binlog_dir);
527 return;
530 binlog_index_min = binlog_scan_dir();
532 if (binlog_index_min) {
533 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
534 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
535 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
537 fd = open(path, O_RDONLY);
539 if (fd < 0) {
540 twarn("%s", path);
541 } else {
542 b = binlog_iref(add_binlog(make_binlog(path)));
543 binlog_read_one(fd, binlog_jobs, path);
544 close(fd);
545 binlog_dref(b);
553 binlog_lock()
555 int r;
556 struct flock lock;
557 char path[PATH_MAX];
559 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
560 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
562 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
563 if (lock_fd == -1) return twarn("open"), 0;
565 lock.l_type = F_WRLCK;
566 lock.l_whence = SEEK_SET;
567 lock.l_start = 0;
568 lock.l_len = 0;
569 r = fcntl(lock_fd, F_SETLK, &lock);
570 if (r) return twarn("fcntl"), 0;
572 return 1;
575 void
576 binlog_init()
578 binlog log;
580 if (!binlog_dir) return;
582 log = make_next_binlog();
583 if (!log) return twarnx("error making first binlog");
584 binlog_fd = binlog_open(log);
585 if (binlog_fd >= 0) add_binlog(log);
588 const char *
589 binlog_oldest_index()
591 if (!first_binlog) return "0";
593 return strrchr(first_binlog->path, '.') + 1;
596 const char *
597 binlog_current_index()
599 if (!last_binlog) return "0";
601 return strrchr(last_binlog->path, '.') + 1;