Rename this.
[beanstalkd.git] / binlog.c
blob159c0b2803dca3cff12604d0fe7e22f02d35de7e
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
31 #include <limits.h>
33 #include "tube.h"
34 #include "job.h"
35 #include "binlog.h"
36 #include "util.h"
38 /* max size we will create a log file */
39 size_t binlog_size_limit = 10 << 20;
41 char *binlog_dir = NULL;
42 static int binlog_index = 0;
43 static int binlog_fd = -1;
44 static int binlog_version = 1;
45 static size_t bytes_written;
46 static int lock_fd;
48 static binlog first_binlog = NULL, last_binlog = NULL;
50 static int
51 binlog_scan_dir()
53 DIR *dirp;
54 struct dirent *dp;
55 long min = 0;
56 long max = 0;
57 long val;
58 char *endptr;
59 size_t name_len;
61 dirp = opendir(binlog_dir);
62 if (!dirp) return 0;
64 while ((dp = readdir(dirp)) != NULL) {
65 name_len = strlen(dp->d_name);
66 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
67 val = strtol(dp->d_name + 7, &endptr, 10);
68 if (endptr && *endptr == 0) {
69 if (max == 0 || val > max) max = val;
70 if (min == 0 || val < min) min = val;
75 closedir(dirp);
76 binlog_index = (int) max;
77 return (int) min;
80 static void
81 binlog_remove_first()
83 binlog b = first_binlog;
85 if (!b) return;
87 first_binlog = b->next;
88 if (!first_binlog) last_binlog = NULL;
90 unlink(b->path);
91 free(b);
94 static binlog
95 binlog_iref(binlog b)
97 if (b) b->refs++;
98 return b;
101 static void
102 binlog_dref(binlog b)
104 if (!b) return;
105 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
107 --b->refs;
108 if (b->refs < 1) {
109 while (first_binlog && first_binlog->refs == 0) binlog_remove_first();
113 static void
114 binlog_warn(const char *msg, int fd, const char* path)
116 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
117 " Continuing with next file. You may be missing data.");
120 static void
121 binlog_read_one(int fd, job binlog_jobs, const char *path)
123 struct job js;
124 tube t;
125 job j;
126 char tubename[MAX_TUBE_NAME_LEN];
127 size_t namelen;
128 ssize_t r;
129 int version;
131 r = read(fd, &version, sizeof(version));
132 if (r == -1) return twarn("read()");
133 if (r < sizeof(version)) {
134 return binlog_warn("version record is too short", fd, path);
137 if (version != binlog_version) {
138 return warnx("%s: binlog version mismatch %d %d", path, version,
139 binlog_version);
142 while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
143 if (namelen > 0) {
144 r = read(fd, tubename, namelen);
145 if (r == -1) return twarn("read()");
146 if (r < namelen) {
147 lseek(fd, SEEK_CUR, 0);
148 return binlog_warn("tube name is too short", fd, path);
152 tubename[namelen] = '\0';
153 r = read(fd, &js, sizeof(struct job));
154 if (r == -1) return twarn("read()");
155 if (r < sizeof(struct job)) {
156 return binlog_warn("job record is too short", fd, path);
159 j = job_find(js.id);
160 switch (js.state) {
161 case JOB_STATE_INVALID:
162 if (j) {
163 job_remove(j);
164 binlog_dref(j->binlog);
165 job_free(j);
166 j = NULL;
168 break;
169 case JOB_STATE_READY:
170 case JOB_STATE_DELAYED:
171 if (!j) {
172 t = tube_find_or_make(tubename);
173 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
174 t, js.id);
175 j->next = j->prev = j;
176 j->creation = js.creation;
177 job_insert(binlog_jobs, j);
178 r = read(fd, j->body, js.body_size);
179 if (r == -1) return twarn("read()");
180 if (r < js.body_size) {
181 warnx("dropping incomplete job %llu", j->id);
182 job_remove(j);
183 binlog_dref(j->binlog);
184 job_free(j);
185 return binlog_warn("job body is too short", fd, path);
188 break;
190 if (j) {
191 j->state = js.state;
192 j->deadline = js.deadline;
193 j->pri = js.pri;
194 j->delay = js.delay;
195 j->ttr = js.ttr;
196 j->timeout_ct = js.timeout_ct;
197 j->release_ct = js.release_ct;
198 j->bury_ct = js.bury_ct;
199 j->kick_ct = js.kick_ct;
201 /* this is a complete record, so we can move the binlog ref */
202 if (namelen && js.body_size) {
203 binlog_dref(j->binlog);
204 j->binlog = binlog_iref(last_binlog);
210 void
211 binlog_close()
213 if (binlog_fd < 0) return;
214 close(binlog_fd);
215 binlog_dref(last_binlog);
216 binlog_fd = -1;
219 static binlog
220 add_binlog(char *path)
222 binlog b;
224 b = (binlog)malloc(sizeof(struct binlog) + strlen(path) + 1);
225 if (!b) return twarnx("OOM"), NULL;
226 strcpy(b->path, path);
227 b->refs = 0;
228 b->next = NULL;
229 if (last_binlog) last_binlog->next = b;
230 last_binlog = b;
231 if (!first_binlog) first_binlog = b;
233 return b;
236 static int
237 binlog_open()
239 char path[PATH_MAX];
240 binlog b;
241 int fd, r;
243 if (!binlog_dir) return -1;
244 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
245 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), -1;
247 if (!binlog_iref(add_binlog(path))) return -1;
248 fd = open(path, O_WRONLY | O_CREAT, 0400);
250 if (fd < 0) {
251 twarn("Cannot open binlog %s", path);
252 return -1;
256 bytes_written = write(fd, &binlog_version, sizeof(int));
258 if (bytes_written < sizeof(int)) {
259 twarn("Cannot write to binlog");
260 close(fd);
261 binlog_dref(last_binlog);
262 return -1;
265 return fd;
268 static void
269 binlog_open_next()
271 if (binlog_fd < 0) return;
272 close(binlog_fd);
273 binlog_dref(last_binlog);
274 binlog_fd = binlog_open();
277 void
278 binlog_write_job(job j)
280 size_t tube_namelen, to_write;
281 struct iovec vec[4], *vptr;
282 int vcnt = 3;
284 if (binlog_fd < 0) return;
285 tube_namelen = 0;
287 vec[0].iov_base = (char *) &tube_namelen;
288 vec[0].iov_len = sizeof(size_t);
289 to_write = sizeof(size_t);
291 vec[1].iov_base = j->tube->name;
292 vec[1].iov_len = 0;
294 /* we could save some bytes in the binlog file by only saving some parts of
295 * the job struct */
296 vec[2].iov_base = (char *) j;
297 vec[2].iov_len = sizeof(struct job);
298 to_write += sizeof(struct job);
300 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
301 if (!j->binlog) {
302 tube_namelen = strlen(j->tube->name);
303 vec[1].iov_len = tube_namelen;
304 to_write += tube_namelen;
305 vcnt = 4;
306 vec[3].iov_base = j->body;
307 vec[3].iov_len = j->body_size;
308 to_write += j->body_size;
310 } else if (j->state == JOB_STATE_INVALID) {
311 if (j->binlog) binlog_dref(j->binlog);
312 j->binlog = NULL;
313 } else {
314 return twarnx("unserializable job state: %d", j->state);
317 if ((bytes_written + to_write) > binlog_size_limit) binlog_open_next();
318 if (binlog_fd < 0) return;
320 if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog);
322 while (to_write > 0) {
323 size_t written = writev(binlog_fd, vec, vcnt);
325 if (written < 0) {
326 twarn("Cannot write to binlog");
327 binlog_close();
328 return;
331 bytes_written += written;
332 to_write -= written;
333 if (to_write > 0 && written > 0) {
334 for (vptr = vec; written >= vptr->iov_len; vptr++) {
335 written -= vptr->iov_len;
336 vptr->iov_len = 0;
338 vptr->iov_base = (char *) vptr->iov_base + written;
339 vptr->iov_len -= written;
345 void
346 binlog_read(job binlog_jobs)
348 int binlog_index_min;
349 struct stat sbuf;
350 int fd, idx, r;
351 char path[PATH_MAX];
352 binlog b;
354 if (!binlog_dir) return;
356 if (stat(binlog_dir, &sbuf) < 0) {
357 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
358 } else if (!(sbuf.st_mode & S_IFDIR)) {
359 twarnx("%s", binlog_dir);
360 return;
363 binlog_index_min = binlog_scan_dir();
365 if (binlog_index_min) {
366 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
367 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
368 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
370 fd = open(path, O_RDONLY);
372 if (fd < 0) {
373 twarn("%s", path);
374 } else {
375 b = binlog_iref(add_binlog(path));
376 binlog_read_one(fd, binlog_jobs, path);
377 close(fd);
378 binlog_dref(b);
386 binlog_lock()
388 int r;
389 struct flock lock;
390 char path[PATH_MAX];
392 r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
393 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
395 lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
396 if (lock_fd == -1) return twarn("open"), 0;
398 lock.l_type = F_WRLCK;
399 lock.l_whence = SEEK_SET;
400 lock.l_start = 0;
401 lock.l_len = 0;
402 r = fcntl(lock_fd, F_SETLK, &lock);
403 if (r) return twarn("fcntl"), 0;
405 return 1;
408 void
409 binlog_init()
411 binlog_fd = binlog_open();
414 const char *
415 binlog_oldest_index()
417 if (!first_binlog) return "0";
419 return strrchr(first_binlog->path, '.') + 1;
422 const char *
423 binlog_current_index()
425 if (!last_binlog) return "0";
427 return strrchr(last_binlog->path, '.') + 1;